From ac4fa22c47d0dbaf5141671894d278a23cb2bca3 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Fri, 2 Jun 2023 19:22:45 +0300
Subject: [PATCH 1/6] refactor: extract ddl_create_space_on_master function

---
 src/rpc/ddl_apply.rs |  57 +++++------------------
 src/storage.rs       | 106 ++++++++++++++++++++++++++++++++-----------
 2 files changed, 91 insertions(+), 72 deletions(-)

diff --git a/src/rpc/ddl_apply.rs b/src/rpc/ddl_apply.rs
index f874d8e4be..80ee2c8111 100644
--- a/src/rpc/ddl_apply.rs
+++ b/src/rpc/ddl_apply.rs
@@ -1,4 +1,5 @@
 use crate::op::Ddl;
+use crate::storage::ddl_create_space_on_master;
 use crate::storage::Clusterwide;
 use crate::storage::{local_schema_version, set_local_schema_version};
 use crate::tlog;
@@ -9,7 +10,6 @@ use crate::traft::{RaftIndex, RaftTerm};
 use std::time::Duration;
 use tarantool::error::{TarantoolError, TarantoolErrorCode};
 use tarantool::ffi::tarantool as ffi;
-use tarantool::space::{Space, SystemSpace};
 
 crate::define_rpc_request! {
     fn proc_apply_schema_change(req: Request) -> Result<Response> {
@@ -82,54 +82,11 @@ crate::define_rpc_request! {
 // TODO: move this to crate::schema maybe?
 pub fn apply_schema_change(storage: &Clusterwide, ddl: &Ddl, version: u64) -> Result<Response> {
     debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
-    let sys_space = Space::from(SystemSpace::Space);
-    let sys_index = Space::from(SystemSpace::Index);
 
     match *ddl {
         Ddl::CreateSpace { id, .. } => {
-            let pico_space_def = storage
-                .spaces
-                .get(id)?
-                .ok_or_else(|| Error::other(format!("space with id #{id} not found")))?;
-            // TODO: set defaults
-            let tt_space_def = pico_space_def.to_space_metadata()?;
-
-            let pico_pk_def = storage.indexes.get(id, 0)?.ok_or_else(|| {
-                Error::other(format!(
-                    "primary index for space {} not found",
-                    pico_space_def.name
-                ))
-            })?;
-            let tt_pk_def = pico_pk_def.to_index_metadata();
-
-            // For now we just assume that during space creation index with id 1
-            // exists if and only if it is a bucket_id index.
-            let mut tt_bucket_id_def = None;
-            let pico_bucket_id_def = storage.indexes.get(id, 1)?;
-            if let Some(def) = &pico_bucket_id_def {
-                tt_bucket_id_def = Some(def.to_index_metadata());
-            }
-
-            let res = (|| -> tarantool::Result<()> {
-                if tt_pk_def.parts.is_empty() {
-                    return Err(tarantool::set_and_get_error!(
-                        tarantool::error::TarantoolErrorCode::ModifyIndex,
-                        "can't create index '{}' in space '{}': parts list cannot be empty",
-                        tt_pk_def.name,
-                        tt_space_def.name,
-                    )
-                    .into());
-                }
-                sys_space.insert(&tt_space_def)?;
-                sys_index.insert(&tt_pk_def)?;
-                if let Some(def) = tt_bucket_id_def {
-                    sys_index.insert(&def)?;
-                }
-                set_local_schema_version(version)?;
-
-                Ok(())
-            })();
-            if let Err(e) = res {
+            let abort_reason = ddl_create_space_on_master(storage, id)?;
+            if let Some(e) = abort_reason {
                 // We return Ok(error) because currently this is the only
                 // way to report an application level error.
                 return Ok(Response::Abort {
@@ -142,5 +99,13 @@ pub fn apply_schema_change(storage: &Clusterwide, ddl: &Ddl, version: u64) -> Re
         }
     }
 
+    if let Err(e) = set_local_schema_version(version) {
+        // We return Ok(error) because currently this is the only
+        // way to report an application level error.
+        return Ok(Response::Abort {
+            reason: e.to_string(),
+        });
+    }
+
     Ok(Response::Ok)
 }
diff --git a/src/storage.rs b/src/storage.rs
index 2db24dfcb9..bde42a04e5 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -591,9 +591,6 @@ impl Clusterwide {
     }
 
     pub fn apply_ddl_changes_on_replicaset_master(&self) -> traft::Result<()> {
-        let sys_space = Space::from(SystemSpace::Space);
-        let sys_index = Space::from(SystemSpace::Index);
-
         for space_def in self.spaces.iter()? {
             if !space_def.operable {
                 // If it so happens, that we receive an unfinished schema change via snapshot,
@@ -602,29 +599,12 @@ impl Clusterwide {
                 continue;
             }
 
-            let Some(pk_def) = self.indexes.get(space_def.id, 0)? else {
-                crate::warn_or_panic!("a space definition without a primary index arrived via snapshot: {space_def:?}");
-                continue;
-            };
-
-            // For now we just assume that during space creation index with id 1
-            // exists if and only if it is a bucket_id index.
-            let bucket_id_def = self.indexes.get(space_def.id, 1)?;
-
-            // XXX: this logic is duplicated in proc_apply_schema_change, but
-            // the code is so small, it doesn't seem forth it extracting it for
-            // now
-            let tt_space_def = space_def.to_space_metadata()?;
-            let tt_pk_def = pk_def.to_index_metadata();
-            let mut tt_bucket_id_def = None;
-            if let Some(def) = &bucket_id_def {
-                tt_bucket_id_def = Some(def.to_index_metadata());
-            }
-
-            sys_space.replace(&tt_space_def)?;
-            sys_index.replace(&tt_pk_def)?;
-            if let Some(def) = tt_bucket_id_def {
-                sys_index.replace(&def)?;
+            let abort_reason = ddl_create_space_on_master(self, space_def.id)?;
+            if let Some(e) = abort_reason {
+                return Err(Error::other(format!(
+                    "failed to create space {}: {e}",
+                    space_def.id
+                )));
             }
         }
 
@@ -1621,6 +1601,80 @@ impl Indexes {
     }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// ddl
+////////////////////////////////////////////////////////////////////////////////
+
+/// Create tarantool space and any required indexes. Currently it creates a
+/// primary index and a `bucket_id` index if it's a sharded space.
+///
+/// Return values:
+/// * `Ok(None)` in case of success.
+/// * `Ok(Some(abort_reason))` in case of error which should result in a ddl abort.
+/// * `Err(e)` in case of retryable errors.
+///
+// FIXME: this function returns 2 kinds of errors: retryable and non-retryable.
+// Currently this is impelemnted by returning one kind of errors as Err(e) and
+// the other as Ok(Some(e)). This was the simplest solution at the time this
+// function was implemented, as it requires the least amount of boilerplate and
+// error forwarding code. But this signature is not intuitive, so maybe there's
+// room for improvement.
+pub fn ddl_create_space_on_master(
+    storage: &Clusterwide,
+    space_id: SpaceId,
+) -> traft::Result<Option<TntError>> {
+    debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
+    let sys_space = Space::from(SystemSpace::Space);
+    let sys_index = Space::from(SystemSpace::Index);
+
+    let pico_space_def = storage
+        .spaces
+        .get(space_id)?
+        .ok_or_else(|| Error::other(format!("space with id {space_id} not found")))?;
+    // TODO: set defaults
+    let tt_space_def = pico_space_def.to_space_metadata()?;
+
+    let pico_pk_def = storage.indexes.get(space_id, 0)?.ok_or_else(|| {
+        Error::other(format!(
+            "primary index for space {} not found",
+            pico_space_def.name
+        ))
+    })?;
+    let tt_pk_def = pico_pk_def.to_index_metadata();
+
+    // For now we just assume that during space creation index with id 1
+    // exists if and only if it is a bucket_id index.
+    let mut tt_bucket_id_def = None;
+    let pico_bucket_id_def = storage.indexes.get(space_id, 1)?;
+    if let Some(def) = &pico_bucket_id_def {
+        tt_bucket_id_def = Some(def.to_index_metadata());
+    }
+
+    let res = (|| -> tarantool::Result<()> {
+        if tt_pk_def.parts.is_empty() {
+            return Err(tarantool::set_and_get_error!(
+                tarantool::error::TarantoolErrorCode::ModifyIndex,
+                "can't create index '{}' in space '{}': parts list cannot be empty",
+                tt_pk_def.name,
+                tt_space_def.name,
+            )
+            .into());
+        }
+        sys_space.insert(&tt_space_def)?;
+        sys_index.insert(&tt_pk_def)?;
+        if let Some(def) = tt_bucket_id_def {
+            sys_index.insert(&def)?;
+        }
+
+        Ok(())
+    })();
+    Ok(res.err())
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// local schema version
+////////////////////////////////////////////////////////////////////////////////
+
 pub fn local_schema_version() -> tarantool::Result<u64> {
     let space_schema = Space::from(SystemSpace::Schema);
     let tuple = space_schema.get(&["local_schema_version"])?;
-- 
GitLab


From a0ec420193011dc83c98b47305133b0c32663a05 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Tue, 30 May 2023 18:10:16 +0300
Subject: [PATCH 2/6] feat: implement ddl drop space (snapshot not implemented
 yet)

---
 src/rpc/ddl_apply.rs | 44 ++++++++++++++++++++--
 src/schema.rs        | 23 +-----------
 src/storage.rs       | 89 ++++++++++++++++++++++++++++++++++++++++++++
 src/traft/node.rs    | 61 ++++++++++++++++++++++++++++--
 4 files changed, 188 insertions(+), 29 deletions(-)

diff --git a/src/rpc/ddl_apply.rs b/src/rpc/ddl_apply.rs
index 80ee2c8111..1bf886b4c5 100644
--- a/src/rpc/ddl_apply.rs
+++ b/src/rpc/ddl_apply.rs
@@ -1,6 +1,6 @@
 use crate::op::Ddl;
-use crate::storage::ddl_create_space_on_master;
 use crate::storage::Clusterwide;
+use crate::storage::{ddl_create_space_on_master, ddl_drop_space_on_master};
 use crate::storage::{local_schema_version, set_local_schema_version};
 use crate::tlog;
 use crate::traft::error::Error;
@@ -43,7 +43,7 @@ crate::define_rpc_request! {
         // TODO: transaction may have already started, if we're in a process of
         // creating a big index. If governor sends a repeat rpc request to us we
         // should handle this correctly
-        let res = apply_schema_change(storage, &ddl, pending_schema_version);
+        let res = apply_schema_change(storage, &ddl, pending_schema_version, false);
         match res {
             Ok(Response::Abort { .. }) | Err(_) => {
                 let rc = unsafe { ffi::box_txn_rollback() };
@@ -79,8 +79,27 @@ crate::define_rpc_request! {
     }
 }
 
-// TODO: move this to crate::schema maybe?
-pub fn apply_schema_change(storage: &Clusterwide, ddl: &Ddl, version: u64) -> Result<Response> {
+/// Applies the schema change described by `ddl` to the tarantool storage. This
+/// function is only called on replicaset masters, other replicas get the
+/// changes via tarantool replication.
+///
+/// In case of successful schema change the local schema version will be set to
+/// `version`. In case of [`Ddl::DropSpace`] and [`Ddl::DropIndex`] schema is
+/// only changed if `is_commit` is `true`.
+///
+/// The space and index definitions are extracted from picodata storage via
+/// `storage`.
+///
+/// `is_commit` is `true` if schema change is being applied in response to a
+/// [`DdlCommit`] raft entry, else it's `false`.
+///
+/// [`DdlCommit`]: crate::traft::op::Op::DdlCommit
+pub fn apply_schema_change(
+    storage: &Clusterwide,
+    ddl: &Ddl,
+    version: u64,
+    is_commit: bool,
+) -> Result<Response> {
     debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
 
     match *ddl {
@@ -94,6 +113,23 @@ pub fn apply_schema_change(storage: &Clusterwide, ddl: &Ddl, version: u64) -> Re
                 });
             }
         }
+
+        Ddl::DropSpace { id } => {
+            if !is_commit {
+                // Space is only dropped on commit.
+                return Ok(Response::Ok);
+            }
+
+            let abort_reason = ddl_drop_space_on_master(id)?;
+            if let Some(e) = abort_reason {
+                // We return Ok(error) because currently this is the only
+                // way to report an application level error.
+                return Ok(Response::Abort {
+                    reason: e.to_string(),
+                });
+            }
+        }
+
         _ => {
             todo!();
         }
diff --git a/src/schema.rs b/src/schema.rs
index b69bd451c8..7f79c51037 100644
--- a/src/schema.rs
+++ b/src/schema.rs
@@ -8,7 +8,6 @@ use tarantool::{
     index::{IndexId, Part},
     schema::space::SpaceMetadata,
     space::SpaceId,
-    space::{Space, SystemSpace},
     tlua::{self, LuaRead},
     tuple::Encode,
     util::Value,
@@ -21,7 +20,7 @@ use crate::compare_and_swap;
 use crate::rpc;
 use crate::storage::ToEntryIter;
 use crate::storage::SPACE_ID_INTERNAL_MAX;
-use crate::storage::{set_local_schema_version, Clusterwide, ClusterwideSpaceId, PropertyName};
+use crate::storage::{Clusterwide, ClusterwideSpaceId, PropertyName};
 use crate::traft::op::{Ddl, DdlBuilder, Op};
 use crate::traft::{self, event, node, RaftIndex};
 use crate::util::instant_saturating_add;
@@ -162,26 +161,6 @@ impl IndexDef {
     }
 }
 
-pub fn ddl_abort_on_master(ddl: &Ddl, version: u64) -> traft::Result<()> {
-    debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
-    let sys_space = Space::from(SystemSpace::Space);
-    let sys_index = Space::from(SystemSpace::Index);
-
-    match *ddl {
-        Ddl::CreateSpace { id, .. } => {
-            sys_index.delete(&[id, 1])?;
-            sys_index.delete(&[id, 0])?;
-            sys_space.delete(&[id])?;
-            set_local_schema_version(version)?;
-        }
-        _ => {
-            todo!();
-        }
-    }
-
-    Ok(())
-}
-
 // TODO: this should be a TryFrom in tarantool-module
 pub fn try_space_field_type_to_index_field_type(
     ft: tarantool::space::FieldType,
diff --git a/src/storage.rs b/src/storage.rs
index bde42a04e5..9326516a7e 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -1599,12 +1599,52 @@ impl Indexes {
             None => Ok(None),
         }
     }
+
+    #[inline]
+    pub fn by_space_id(&self, space_id: SpaceId) -> tarantool::Result<EntryIter<IndexDef>> {
+        let iter = self.space.select(IteratorType::Eq, &[space_id])?;
+        Ok(EntryIter::new(iter))
+    }
+}
+
+impl ToEntryIter for Indexes {
+    type Entry = IndexDef;
+
+    #[inline(always)]
+    fn index_iter(&self) -> Result<IndexIterator> {
+        Ok(self.space.select(IteratorType::All, &())?)
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 // ddl
 ////////////////////////////////////////////////////////////////////////////////
 
+pub fn ddl_abort_on_master(ddl: &Ddl, version: u64) -> traft::Result<()> {
+    debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
+    let sys_space = Space::from(SystemSpace::Space);
+    let sys_index = Space::from(SystemSpace::Index);
+
+    match *ddl {
+        Ddl::CreateSpace { id, .. } => {
+            sys_index.delete(&[id, 1])?;
+            sys_index.delete(&[id, 0])?;
+            sys_space.delete(&[id])?;
+            set_local_schema_version(version)?;
+        }
+
+        Ddl::DropSpace { .. } => {
+            // Actual drop happens only on commit, so there's nothing to abort.
+        }
+
+        _ => {
+            todo!();
+        }
+    }
+
+    Ok(())
+}
+
 /// Create tarantool space and any required indexes. Currently it creates a
 /// primary index and a `bucket_id` index if it's a sharded space.
 ///
@@ -1671,6 +1711,51 @@ pub fn ddl_create_space_on_master(
     Ok(res.err())
 }
 
+/// Drop tarantool space and any entities which depend on it (currently just indexes).
+///
+/// Return values:
+/// * `Ok(None)` in case of success.
+/// * `Ok(Some(abort_reason))` in case of error which should result in a ddl abort.
+/// * `Err(e)` in case of retryable errors.
+///
+// FIXME: this function returns 2 kinds of errors: retryable and non-retryable.
+// Currently this is impelemnted by returning one kind of errors as Err(e) and
+// the other as Ok(Some(e)). This was the simplest solution at the time this
+// function was implemented, as it requires the least amount of boilerplate and
+// error forwarding code. But this signature is not intuitive, so maybe there's
+// room for improvement.
+pub fn ddl_drop_space_on_master(space_id: SpaceId) -> traft::Result<Option<TntError>> {
+    debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
+    let sys_space = Space::from(SystemSpace::Space);
+    let sys_index = Space::from(SystemSpace::Index);
+    let sys_truncate = Space::from(SystemSpace::Truncate);
+
+    let iter = sys_index.select(IteratorType::Eq, &[space_id])?;
+    let mut index_ids = Vec::with_capacity(4);
+    for tuple in iter {
+        let index_id: IndexId = tuple
+            .field(1)?
+            .expect("decoding metadata should never fail");
+        // Primary key is handled explicitly.
+        if index_id != 0 {
+            index_ids.push(index_id);
+        }
+    }
+    let res = (|| -> tarantool::Result<()> {
+        // TODO: delete it from _truncate, delete grants
+        for iid in index_ids.iter().rev() {
+            sys_index.delete(&(space_id, iid))?;
+        }
+        // Primary key must be dropped last.
+        sys_index.delete(&(space_id, 0))?;
+        sys_truncate.delete(&[space_id])?;
+        sys_space.delete(&[space_id])?;
+
+        Ok(())
+    })();
+    Ok(res.err())
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // local schema version
 ////////////////////////////////////////////////////////////////////////////////
@@ -1693,6 +1778,10 @@ pub fn set_local_schema_version(v: u64) -> tarantool::Result<()> {
     Ok(())
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// max space id
+////////////////////////////////////////////////////////////////////////////////
+
 pub const SPACE_ID_INTERNAL_MAX: u32 = 1024;
 
 /// Updates box.space._schema max_id to start outside the reserved internal
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 1bcc9e19ce..54b666511f 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -12,8 +12,8 @@ use crate::kvcell::KVCell;
 use crate::loop_start;
 use crate::r#loop::FlowControl;
 use crate::rpc;
-use crate::schema::ddl_abort_on_master;
 use crate::schema::{Distribution, IndexDef, SpaceDef};
+use crate::storage::ddl_abort_on_master;
 use crate::storage::local_schema_version;
 use crate::storage::SnapshotData;
 use crate::storage::ToEntryIter as _;
@@ -858,10 +858,12 @@ impl NodeImpl {
                     if self.is_readonly() {
                         return SleepAndRetry;
                     } else {
+                        // Master applies schema change at this point.
                         let resp = rpc::ddl_apply::apply_schema_change(
                             &self.storage,
                             &ddl,
                             pending_version,
+                            true,
                         )
                         .expect("storage error");
                         match resp {
@@ -895,6 +897,25 @@ impl NodeImpl {
                         // be done, but for now we just ignore the error "no such index"
                         let _ = res;
                     }
+
+                    Ddl::DropSpace { id } => {
+                        self.storage
+                            .spaces
+                            .delete(id)
+                            .expect("storage should never fail");
+                        let iter = self
+                            .storage
+                            .indexes
+                            .by_space_id(id)
+                            .expect("storage should never fail");
+                        for index in iter {
+                            self.storage
+                                .indexes
+                                .delete(index.space_id, index.id)
+                                .expect("storage should never fail");
+                        }
+                    }
+
                     _ => {
                         todo!()
                     }
@@ -939,6 +960,25 @@ impl NodeImpl {
                         self.storage.indexes.delete(id, 0).expect("storage error");
                         self.storage.spaces.delete(id).expect("storage error");
                     }
+
+                    Ddl::DropSpace { id } => {
+                        self.storage
+                            .spaces
+                            .update_operable(id, true)
+                            .expect("storage should never fail");
+                        let iter = self
+                            .storage
+                            .indexes
+                            .by_space_id(id)
+                            .expect("storage should never fail");
+                        for index in iter {
+                            self.storage
+                                .indexes
+                                .update_operable(index.space_id, index.id, true)
+                                .expect("storage should never fail");
+                        }
+                    }
+
                     _ => {
                         todo!()
                     }
@@ -1117,10 +1157,25 @@ impl NodeImpl {
                 let _ = (space_id, index_id, by_fields);
                 todo!();
             }
+
             Ddl::DropSpace { id } => {
-                let _ = id;
-                todo!();
+                self.storage
+                    .spaces
+                    .update_operable(id, false)
+                    .expect("storage should never fail");
+                let iter = self
+                    .storage
+                    .indexes
+                    .by_space_id(id)
+                    .expect("storage should never fail");
+                for index in iter {
+                    self.storage
+                        .indexes
+                        .update_operable(index.space_id, index.id, false)
+                        .expect("storage should never fail");
+                }
             }
+
             Ddl::DropIndex { index_id, space_id } => {
                 let _ = (index_id, space_id);
                 todo!();
-- 
GitLab


From cb7f653d4483d31da54294988ff35aac729444c2 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Wed, 31 May 2023 17:45:05 +0300
Subject: [PATCH 3/6] test: add a couple of ddl drop space tests

---
 test/conftest.py     |  34 ++--
 test/int/test_ddl.py | 458 ++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 477 insertions(+), 15 deletions(-)

diff --git a/test/conftest.py b/test/conftest.py
index 98b73b72f9..d813c8a661 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -624,9 +624,9 @@ class Instance:
 
     def cas(
         self,
-        dml_kind: Literal["insert", "replace", "delete"],
+        op_kind: Literal["insert", "replace", "delete", "drop_space"],
         space: str | int,
-        tuple: Tuple | List,
+        tuple: Tuple | List | None = None,
         index: int | None = None,
         term: int | None = None,
         ranges: List[CasRange] | None = None,
@@ -666,25 +666,35 @@ class Instance:
             ranges=predicate_ranges,
         )
 
-        if dml_kind in ["insert", "replace"]:
-            dml = dict(
+        if op_kind in ["insert", "replace"]:
+            op = dict(
                 kind="dml",
-                op_kind=dml_kind,
+                op_kind=op_kind,
                 space=space_id,
                 tuple=msgpack.packb(tuple),
             )
-        elif dml_kind == "delete":
-            dml = dict(
+        elif op_kind == "delete":
+            op = dict(
                 kind="dml",
-                op_kind=dml_kind,
+                op_kind=op_kind,
                 space=space_id,
                 key=msgpack.packb(tuple),
             )
+        elif op_kind == "drop_space":
+            op = dict(
+                kind="ddl_prepare",
+                op_kind=op_kind,
+                schema_version=self.next_schema_version(),
+                ddl=dict(
+                    kind="drop_space",
+                    id=space_id,
+                ),
+            )
         else:
-            raise Exception(f"unsupported {dml_kind=}")
+            raise Exception(f"unsupported {op_kind=}")
 
-        eprint(f"CaS:\n  {predicate=}\n  {dml=}")
-        return self.call(".proc_cas", self.cluster_id, predicate, dml)[0]["index"]
+        eprint(f"CaS:\n  {predicate=}\n  {op=}")
+        return self.call(".proc_cas", self.cluster_id, predicate, op)[0]["index"]
 
     def next_schema_version(self) -> int:
         t = self.call("box.space._pico_property:get", "next_schema_version")
@@ -1062,7 +1072,7 @@ class Cluster:
 
     def create_space(self, params: dict, timeout: float = 3.0):
         """
-        Creates a space. Waits for all peers to be aware of it.
+        Creates a space. Waits for all online peers to be aware of it.
         """
         index = self.instances[0].create_space(params, timeout)
         self.raft_wait_index(index, timeout)
diff --git a/test/int/test_ddl.py b/test/int/test_ddl.py
index 6b9153ec02..87ae1e3e5e 100644
--- a/test/int/test_ddl.py
+++ b/test/int/test_ddl.py
@@ -1,4 +1,5 @@
 import pytest
+import time
 from conftest import Cluster, ReturnError
 
 
@@ -12,6 +13,7 @@ def test_ddl_abort(cluster: Cluster):
     # TODO: test manual abort when we have long-running ddls
 
 
+################################################################################
 def test_ddl_create_space_lua(cluster: Cluster):
     i1, i2 = cluster.deploy(instance_count=2)
 
@@ -95,6 +97,7 @@ def test_ddl_create_space_lua(cluster: Cluster):
     assert i2.call("box.space._pico_space:get", space_id) == pico_space_def
 
 
+################################################################################
 def test_ddl_create_space_bulky(cluster: Cluster):
     i1, i2, i3, i4 = cluster.deploy(instance_count=4, init_replication_factor=2)
 
@@ -236,7 +239,7 @@ def test_ddl_create_space_bulky(cluster: Cluster):
     assert i4.call("box.space._index:get", [space_id, 0]) == tt_pk_def
 
     ############################################################################
-    # A new replicaset catches up after the fact successfully
+    # A new replicaset boots up after the fact successfully
 
     i5 = cluster.add_instance(wait_online=True, replicaset_id="r3")
 
@@ -257,7 +260,10 @@ def test_ddl_create_space_bulky(cluster: Cluster):
     assert i6.call("box.space._space:get", space_id) == tt_space_def
     assert i6.call("box.space._index:get", [space_id, 0]) == tt_pk_def
 
+    # TODO: test replica becoming master in the process of catching up
 
+
+################################################################################
 def test_ddl_create_sharded_space(cluster: Cluster):
     i1, i2 = cluster.deploy(instance_count=2, init_replication_factor=2)
 
@@ -369,6 +375,7 @@ def test_ddl_create_sharded_space(cluster: Cluster):
     assert i2.call("box.space._index:get", [space_id, 1]) == tt_bucket_id_def
 
 
+################################################################################
 def test_ddl_create_space_unfinished_from_snapshot(cluster: Cluster):
     i1, i2, i3 = cluster.deploy(instance_count=3)
 
@@ -422,6 +429,7 @@ def test_ddl_create_space_unfinished_from_snapshot(cluster: Cluster):
         assert i.eval("return box.space._pico_space:get(...).operable", space_id)
 
 
+################################################################################
 def test_ddl_create_space_partial_failure(cluster: Cluster):
     # i2 & i3 are for quorum
     i1, i2, i3, i4, i5 = cluster.deploy(instance_count=5)
@@ -490,6 +498,7 @@ def test_ddl_create_space_partial_failure(cluster: Cluster):
     assert i4.call("box.space._space:get", space_id) is not None
 
 
+################################################################################
 def test_successful_wakeup_after_ddl(cluster: Cluster):
     # Manual replicaset distribution.
     i1 = cluster.add_instance(replicaset_id="r1", wait_online=True)
@@ -526,7 +535,8 @@ def test_successful_wakeup_after_ddl(cluster: Cluster):
     assert i3.call("box.space._space:get", space_id) is not None
 
 
-def test_ddl_from_snapshot_at_boot(cluster: Cluster):
+################################################################################
+def test_ddl_create_space_from_snapshot_at_boot(cluster: Cluster):
     # Second instance is only for quorum
     i1, i2 = cluster.deploy(instance_count=2, init_replication_factor=2)
 
@@ -603,7 +613,8 @@ def test_ddl_from_snapshot_at_boot(cluster: Cluster):
     assert i4.call("box.space._schema:get", "local_schema_version")[1] == 1
 
 
-def test_ddl_from_snapshot_at_catchup(cluster: Cluster):
+################################################################################
+def test_ddl_create_space_from_snapshot_at_catchup(cluster: Cluster):
     # Second instance is only for quorum
     i1 = cluster.add_instance(wait_online=True, replicaset_id="r1")
     i2 = cluster.add_instance(wait_online=True, replicaset_id="R2")
@@ -665,6 +676,7 @@ def test_ddl_from_snapshot_at_catchup(cluster: Cluster):
     assert i3.call("box.space._schema:get", "local_schema_version")[1] == 1
 
 
+################################################################################
 def test_ddl_create_space_at_catchup_with_master_switchover(cluster: Cluster):
     # For quorum.
     i1, i2 = cluster.deploy(instance_count=2, init_replication_factor=1)
@@ -704,3 +716,443 @@ def test_ddl_create_space_at_catchup_with_master_switchover(cluster: Cluster):
 
     # A master catches up by snapshot
     assert i4.call("box.space._space.index.name:get", space_name) is not None
+
+
+################################################################################
+def test_ddl_drop_space_normal(cluster: Cluster):
+    # 2 replicasets with 2 replicas each
+    i1, *_ = cluster.deploy(instance_count=4, init_replication_factor=2)
+
+    # Set up.
+    space_name = "things"
+    cluster.create_space(
+        dict(
+            name=space_name,
+            format=[dict(name="id", type="unsigned", is_nullable=False)],
+            primary_key=["id"],
+            distribution="global",
+        ),
+    )
+
+    for i in cluster.instances:
+        assert i.call("box.space._space.index.name:get", space_name) is not None
+
+    # Actual behaviour we're testing
+    index = i1.cas("drop_space", space=space_name)
+    index_commit = index + 1
+    for i in cluster.instances:
+        i.raft_wait_index(index_commit)
+
+    for i in cluster.instances:
+        assert i.call("box.space._space.index.name:get", space_name) is None
+
+    # Now we can create another space with the same name.
+    cluster.create_space(
+        dict(
+            name=space_name,
+            format=[
+                dict(name="id", type="unsigned", is_nullable=False),
+                dict(name="value", type="any", is_nullable=False),
+            ],
+            primary_key=["id"],
+            distribution="global",
+        ),
+    )
+
+    for i in cluster.instances:
+        assert i.call("box.space._space.index.name:get", space_name) is not None
+
+
+################################################################################
+def test_ddl_drop_space_partial_failure(cluster: Cluster):
+    # First 3 are fore quorum.
+    i1, i2, i3 = cluster.deploy(instance_count=3, init_replication_factor=1)
+    # Test subjects.
+    i4 = cluster.add_instance(wait_online=True, replicaset_id="R99")
+    i5 = cluster.add_instance(wait_online=True, replicaset_id="R99")
+
+    # Set up.
+    space_name = "trinkets"
+    cluster.create_space(
+        dict(
+            name=space_name,
+            format=[dict(name="id", type="unsigned", is_nullable=False)],
+            primary_key=["id"],
+            distribution="global",
+        ),
+    )
+    index = i1.cas("insert", space_name, [9])
+    for i in cluster.instances:
+        i.raft_wait_index(index)
+
+    # Put a replicaset to sleep.
+    i4.terminate()
+    i5.terminate()
+
+    # Ddl fails because all masters must be present.
+    index = i1.cas("drop_space", space=space_name)
+    index_commit = index + 1
+    with pytest.raises(ReturnError, match="timeout"):
+        i1.raft_wait_index(index_commit, timeout=3)
+
+    entry, *_ = i1.call(
+        "box.space._raft_log:select", None, dict(iterator="lt", limit=1)
+    )
+    # Has not yet been finalized
+    assert entry[4][1][0] == "ddl_prepare"
+
+    # Space is not yet dropped.
+    assert i1.call("box.space._space.index.name:get", space_name) is not None
+    assert i2.call("box.space._space.index.name:get", space_name) is not None
+    assert i3.call("box.space._space.index.name:get", space_name) is not None
+
+    # And no data is lost yet.
+    assert i1.call("box.space.trinkets:get", 9) == [9]
+    assert i2.call("box.space.trinkets:get", 9) == [9]
+    assert i3.call("box.space.trinkets:get", 9) == [9]
+
+    # But the space is marked not operable.
+    assert not i1.eval(
+        "return box.space._pico_space.index.name:get(...).operable", space_name
+    )
+    assert not i2.eval(
+        "return box.space._pico_space.index.name:get(...).operable", space_name
+    )
+    assert not i3.eval(
+        "return box.space._pico_space.index.name:get(...).operable", space_name
+    )
+
+    # TODO: test manual ddl abort
+
+    # Wakeup the sleeping master.
+    i4.start()
+    i4.wait_online()
+
+    # TODO: how do we sync raft log at this point?
+    time.sleep(2)
+
+    # Now space is dropped.
+    assert i1.call("box.space._space.index.name:get", space_name) is None
+    assert i2.call("box.space._space.index.name:get", space_name) is None
+    assert i3.call("box.space._space.index.name:get", space_name) is None
+    assert i4.call("box.space._space.index.name:get", space_name) is None
+
+    # And a replica catches up by raft log successfully.
+    i5.start()
+    i5.wait_online()
+    assert i5.call("box.space._space.index.name:get", space_name) is None
+
+
+################################################################################
+def test_ddl_drop_space_by_raft_log_at_catchup(cluster: Cluster):
+    # i1 is for quorum
+    i1, *_ = cluster.deploy(instance_count=1, init_replication_factor=1)
+    i2 = cluster.add_instance(wait_online=True, replicaset_id="r99")
+    # This one will be catching up.
+    i3 = cluster.add_instance(wait_online=True, replicaset_id="r99")
+
+    # Set up.
+    cluster.create_space(
+        dict(
+            name="replace_me",
+            format=[dict(name="id", type="unsigned", is_nullable=False)],
+            primary_key=["id"],
+            distribution="sharded",
+            sharding_key=["id"],
+            sharding_fn="murmur3",
+        ),
+    )
+    for i in cluster.instances:
+        assert i.call("box.space._space.index.name:get", "replace_me") is not None
+
+    cluster.create_space(
+        dict(
+            name="drop_me",
+            format=[dict(name="id", type="unsigned", is_nullable=False)],
+            primary_key=["id"],
+            distribution="global",
+        ),
+    )
+    for i in cluster.instances:
+        assert i.call("box.space._space.index.name:get", "drop_me") is not None
+
+    # i3 will be catching up.
+    i3.terminate()
+
+    # Drop the spaces
+    for space_name in ["replace_me", "drop_me"]:
+        index = i1.cas("drop_space", space=space_name)
+        index_commit = index + 1
+        i1.raft_wait_index(index_commit)
+        i2.raft_wait_index(index_commit)
+        assert i1.call("box.space._space.index.name:get", space_name) is None
+        assert i2.call("box.space._space.index.name:get", space_name) is None
+
+    #
+    # We replace a sharded space with a global one to check indexes were dropped
+    # correctly.
+    cluster.create_space(
+        dict(
+            name="replace_me",
+            format=[
+                dict(name="#", type="unsigned", is_nullable=False),
+            ],
+            primary_key=["#"],
+            distribution="global",
+        ),
+    )
+
+    assert i1.call("box.space._space.index.name:get", "replace_me") is not None
+    assert i2.call("box.space._space.index.name:get", "replace_me") is not None
+
+    # Wake up the catching up instance.
+    i3.start()
+    i3.wait_online()
+
+    # The space was dropped.
+    assert i3.call("box.space._space.index.name:get", "drop_me") is None
+
+    # The space was dropped and a new one was created without conflict.
+    format = i3.eval("return box.space[...]:format()", "replace_me")
+    assert [f["name"] for f in format] == ["#"]
+
+
+################################################################################
+def test_ddl_drop_space_by_raft_log_at_boot(cluster: Cluster):
+    # These guys are for quorum.
+    i1, i2 = cluster.deploy(instance_count=2, init_replication_factor=1)
+
+    #
+    # Set up.
+    #
+    cluster.create_space(
+        dict(
+            name="replace_me",
+            format=[dict(name="id", type="unsigned", is_nullable=False)],
+            primary_key=["id"],
+            distribution="sharded",
+            sharding_key=["id"],
+            sharding_fn="murmur3",
+        ),
+    )
+    for i in cluster.instances:
+        assert i.call("box.space._space.index.name:get", "replace_me") is not None
+
+    cluster.create_space(
+        dict(
+            name="drop_me",
+            format=[dict(name="id", type="unsigned", is_nullable=False)],
+            primary_key=["id"],
+            distribution="global",
+        ),
+    )
+    for i in cluster.instances:
+        assert i.call("box.space._space.index.name:get", "drop_me") is not None
+
+    #
+    # Drop spaces.
+    #
+    for space_name in ["replace_me", "drop_me"]:
+        index = i1.cas("drop_space", space=space_name)
+        index_commit = index + 1
+        i1.raft_wait_index(index_commit)
+        i2.raft_wait_index(index_commit)
+        assert i1.call("box.space._space.index.name:get", space_name) is None
+        assert i2.call("box.space._space.index.name:get", space_name) is None
+
+    #
+    # We replace a sharded space with a global one to check indexes were dropped
+    # correctly.
+    cluster.create_space(
+        dict(
+            name="replace_me",
+            format=[
+                dict(name="#", type="unsigned", is_nullable=False),
+            ],
+            primary_key=["#"],
+            distribution="global",
+        ),
+    )
+
+    assert i1.call("box.space._space.index.name:get", "replace_me") is not None
+    assert i2.call("box.space._space.index.name:get", "replace_me") is not None
+
+    #
+    # Add a new replicaset.
+    #
+    i3 = cluster.add_instance(wait_online=False, replicaset_id="r99")
+    i4 = cluster.add_instance(wait_online=False, replicaset_id="r99")
+    i3.start()
+    i4.start()
+    i3.wait_online()
+    i4.wait_online()
+
+    #
+    # Both caught up successfully.
+    #
+    assert i3.call("box.space._space.index.name:get", "drop_me") is None
+    assert i4.call("box.space._space.index.name:get", "drop_me") is None
+
+    format = i3.eval("return box.space[...]:format()", "replace_me")
+    assert [f["name"] for f in format] == ["#"]
+    format = i4.eval("return box.space[...]:format()", "replace_me")
+    assert [f["name"] for f in format] == ["#"]
+
+
+################################################################################
+def test_ddl_drop_space_by_snapshot_on_replica(cluster: Cluster):
+    # i1 is for quorum
+    i1, *_ = cluster.deploy(instance_count=1, init_replication_factor=1)
+    i2 = cluster.add_instance(wait_online=True, replicaset_id="r99")
+    # This one will be catching up.
+    i3 = cluster.add_instance(wait_online=True, replicaset_id="r99")
+
+    # Set up.
+    cluster.create_space(
+        dict(
+            name="replace_me",
+            format=[dict(name="id", type="unsigned", is_nullable=False)],
+            primary_key=["id"],
+            distribution="sharded",
+            sharding_key=["id"],
+            sharding_fn="murmur3",
+        ),
+    )
+    for i in cluster.instances:
+        assert i.call("box.space._space.index.name:get", "replace_me") is not None
+
+    cluster.create_space(
+        dict(
+            name="drop_me",
+            format=[dict(name="id", type="unsigned", is_nullable=False)],
+            primary_key=["id"],
+            distribution="sharded",
+            sharding_key=["id"],
+            sharding_fn="murmur3",
+        ),
+    )
+    for i in cluster.instances:
+        assert i.call("box.space._space.index.name:get", "drop_me") is not None
+
+    # i3 will be catching up.
+    i3.terminate()
+
+    for space_name in ["replace_me", "drop_me"]:
+        index = i1.cas("drop_space", space=space_name)
+        index_commit = index + 1
+        i1.raft_wait_index(index_commit)
+        i2.raft_wait_index(index_commit)
+        assert i1.call("box.space._space.index.name:get", space_name) is None
+        assert i2.call("box.space._space.index.name:get", space_name) is None
+
+    # We replace a sharded space with a global one to check indexes were dropped
+    # correctly.
+    cluster.create_space(
+        dict(
+            name="replace_me",
+            format=[
+                dict(name="#", type="unsigned", is_nullable=False),
+            ],
+            primary_key=["#"],
+            distribution="global",
+        ),
+    )
+
+    assert i1.call("box.space._space.index.name:get", "replace_me") is not None
+    assert i2.call("box.space._space.index.name:get", "replace_me") is not None
+
+    # Compact raft log to trigger snapshot generation.
+    i1.raft_compact_log()
+    i2.raft_compact_log()
+
+    # Wake up the catching up instance.
+    i3.start()
+    i3.wait_online()
+
+    # The space was dropped.
+    assert i3.call("box.space._space.index.name:get", "drop_me") is None
+
+    # The space was dropped and a new one was created without conflict.
+    format = i3.eval("return box.space[...]:format()", "replace_me")
+    assert [f["name"] for f in format] == ["#"]
+
+
+################################################################################
+def test_ddl_drop_space_by_snapshot_on_master(cluster: Cluster):
+    # These ones are for quorum.
+    i1, i2 = cluster.deploy(instance_count=2, init_replication_factor=1)
+    # This is a replicaset master, who will be following along with the ddl.
+    i3 = cluster.add_instance(wait_online=True, replicaset_id="r99")
+    # This is a replica, who will become master and be catching up.
+    i4 = cluster.add_instance(wait_online=True, replicaset_id="r99")
+
+    # Set up.
+    cluster.create_space(
+        dict(
+            name="space_to_drop",
+            format=[dict(name="id", type="unsigned", is_nullable=False)],
+            primary_key=["id"],
+            distribution="global",
+        ),
+    )
+    cluster.create_space(
+        dict(
+            name="space_to_replace",
+            format=[dict(name="id", type="unsigned", is_nullable=False)],
+            primary_key=["id"],
+            distribution="sharded",
+            sharding_key=["id"],
+            sharding_fn="murmur3",
+        ),
+    )
+
+    for space_name in ["space_to_drop", "space_to_replace"]:
+        for i in cluster.instances:
+            assert i.call("box.space._space.index.name:get", space_name) is not None
+
+    # i4 will be catching up.
+    i4.terminate()
+
+    #
+    # Drop spaces.
+    #
+    for space_name in ["space_to_drop", "space_to_replace"]:
+        index = i1.cas("drop_space", space=space_name)
+        index_commit = index + 1
+        i1.raft_wait_index(index_commit)
+        i2.raft_wait_index(index_commit)
+        i3.raft_wait_index(index_commit)
+        assert i1.call("box.space._space.index.name:get", space_name) is None
+        assert i2.call("box.space._space.index.name:get", space_name) is None
+        assert i3.call("box.space._space.index.name:get", space_name) is None
+
+    # We replace a sharded space with a global one to check indexes were dropped
+    # correctly.
+    cluster.create_space(
+        dict(
+            name="space_to_replace",
+            format=[dict(name="id", type="unsigned", is_nullable=False)],
+            primary_key=["id"],
+            distribution="global",
+        ),
+    )
+
+    assert i1.call("box.space._space.index.name:get", "space_to_replace") is not None
+    assert i2.call("box.space._space.index.name:get", "space_to_replace") is not None
+    assert i3.call("box.space._space.index.name:get", "space_to_replace") is not None
+
+    # Compact raft log to trigger snapshot generation.
+    i1.raft_compact_log()
+    i2.raft_compact_log()
+
+    # Put i3 to sleep to trigger master switchover.
+    i3.terminate()
+
+    # Wake up the catching up instance. i4 has become master and.
+    i4.start()
+    i4.wait_online()
+
+    # The space was dropped.
+    # assert i4.call("box.space._space.index.name:get", "space_to_drop") is None
+    # The space was replaced.
+    assert i4.call("box.space._space.index.name:get", "space_to_replace") is not None
-- 
GitLab


From e9b1b821cd18477b2b3a2108bf79a2af89255e60 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Fri, 2 Jun 2023 19:24:27 +0300
Subject: [PATCH 4/6] fix: ddl drop space from snapshot

---
 src/storage.rs | 66 +++++++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 60 insertions(+), 6 deletions(-)

diff --git a/src/storage.rs b/src/storage.rs
index 9326516a7e..7b65f2aa05 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -19,7 +19,7 @@ use crate::traft::RaftId;
 use crate::traft::Result;
 
 use std::collections::hash_map::Entry;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
 use std::marker::PhantomData;
 use std::rc::Rc;
 
@@ -550,6 +550,12 @@ impl Clusterwide {
     pub fn apply_snapshot_data(&self, data: &SnapshotData, is_master: bool) -> Result<()> {
         debug_assert!(unsafe { ::tarantool::ffi::tarantool::box_txn() });
 
+        // We need to save these before truncating _pico_space.
+        let mut old_space_versions: HashMap<SpaceId, u64> = HashMap::new();
+        for space_def in self.spaces.iter()? {
+            old_space_versions.insert(space_def.id, space_def.schema_version);
+        }
+
         let mut dont_exist_yet = Vec::new();
         for space_dump in &data.space_dumps {
             let space_name = &space_dump.space_name;
@@ -567,7 +573,7 @@ impl Clusterwide {
         // If we're not the replication master, the rest of the data will come
         // via tarantool replication.
         if is_master {
-            self.apply_ddl_changes_on_replicaset_master()?;
+            self.apply_ddl_changes_on_replicaset_master(&old_space_versions)?;
             set_local_schema_version(data.schema_version)?;
         }
 
@@ -590,8 +596,59 @@ impl Clusterwide {
         Ok(())
     }
 
-    pub fn apply_ddl_changes_on_replicaset_master(&self) -> traft::Result<()> {
+    pub fn apply_ddl_changes_on_replicaset_master(
+        &self,
+        old_space_versions: &HashMap<SpaceId, u64>,
+    ) -> traft::Result<()> {
+        debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
+
+        let mut space_defs = Vec::new();
+        let mut new_space_ids = HashSet::new();
         for space_def in self.spaces.iter()? {
+            new_space_ids.insert(space_def.id);
+            space_defs.push(space_def);
+        }
+
+        // First we drop all the spaces which have been dropped.
+        for &old_space_id in old_space_versions.keys() {
+            if new_space_ids.contains(&old_space_id) {
+                // Will be handled later.
+                continue;
+            }
+
+            // Space was dropped.
+            let abort_reason = ddl_drop_space_on_master(old_space_id)?;
+            if let Some(e) = abort_reason {
+                return Err(Error::other(format!(
+                    "failed to drop space {old_space_id}: {e}"
+                )));
+            }
+        }
+
+        // Now create any new spaces, or replace ones that changed.
+        for space_def in &space_defs {
+            let space_id = space_def.id;
+
+            if let Some(&v_old) = old_space_versions.get(&space_id) {
+                let v_new = space_def.schema_version;
+                assert!(v_old <= v_new);
+
+                if v_old == v_new {
+                    // Space is up to date.
+                    continue;
+                }
+
+                // Space was updated, need to drop it and recreate.
+                let abort_reason = ddl_drop_space_on_master(space_def.id)?;
+                if let Some(e) = abort_reason {
+                    return Err(Error::other(format!(
+                        "failed to drop space {space_id}: {e}"
+                    )));
+                }
+            } else {
+                // New space.
+            }
+
             if !space_def.operable {
                 // If it so happens, that we receive an unfinished schema change via snapshot,
                 // which will somehow get finished without the governor sending us
@@ -610,9 +667,6 @@ impl Clusterwide {
 
         // TODO: secondary indexes
 
-        // TODO: check if a space exists here in box.space._space, but doesn't
-        // exist in pico._space, then delete it
-
         Ok(())
     }
 
-- 
GitLab


From 79267b4e5cec3eed14d903d0236384441cda384d Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Mon, 5 Jun 2023 10:17:25 +0300
Subject: [PATCH 5/6] refactor: extract function ddl_meta_space_update_operable

---
 src/storage.rs    | 23 ++++++++++++++++++++
 src/traft/node.rs | 53 +++++++----------------------------------------
 2 files changed, 30 insertions(+), 46 deletions(-)

diff --git a/src/storage.rs b/src/storage.rs
index 7b65f2aa05..f9bd521c9e 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -1670,6 +1670,29 @@ impl ToEntryIter for Indexes {
     }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// ddl meta
+////////////////////////////////////////////////////////////////////////////////
+
+/// Updates the field `"operable"` for a space with id `space_id` and any
+/// necessary entities (currently all existing indexes).
+///
+/// This function is called when applying the different ddl operations.
+pub fn ddl_meta_space_update_operable(
+    storage: &Clusterwide,
+    space_id: SpaceId,
+    operable: bool,
+) -> traft::Result<()> {
+    storage.spaces.update_operable(space_id, operable)?;
+    let iter = storage.indexes.by_space_id(space_id)?;
+    for index in iter {
+        storage
+            .indexes
+            .update_operable(index.space_id, index.id, operable)?;
+    }
+    Ok(())
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // ddl
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 54b666511f..dba0110e3c 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -13,10 +13,10 @@ use crate::loop_start;
 use crate::r#loop::FlowControl;
 use crate::rpc;
 use crate::schema::{Distribution, IndexDef, SpaceDef};
-use crate::storage::ddl_abort_on_master;
 use crate::storage::local_schema_version;
 use crate::storage::SnapshotData;
 use crate::storage::ToEntryIter as _;
+use crate::storage::{ddl_abort_on_master, ddl_meta_space_update_operable};
 use crate::storage::{Clusterwide, ClusterwideSpaceId, PropertyName};
 use crate::stringify_cfunc;
 use crate::sync;
@@ -881,21 +881,8 @@ impl NodeImpl {
                 // Update pico metadata.
                 match ddl {
                     Ddl::CreateSpace { id, .. } => {
-                        self.storage
-                            .spaces
-                            .update_operable(id, true)
-                            .expect("storage error");
-                        self.storage
-                            .indexes
-                            .update_operable(id, 0, true)
-                            .expect("storage error");
-                        // For now we just assume that during space creation index with id 1
-                        // exists if and only if it is a bucket_id index.
-                        let res = self.storage.indexes.update_operable(id, 1, true);
-                        // TODO: maybe we should first check if this index
-                        // exists or check the space definition if this should
-                        // be done, but for now we just ignore the error "no such index"
-                        let _ = res;
+                        ddl_meta_space_update_operable(&self.storage, id, true)
+                            .expect("storage shouldn't fail");
                     }
 
                     Ddl::DropSpace { id } => {
@@ -962,21 +949,8 @@ impl NodeImpl {
                     }
 
                     Ddl::DropSpace { id } => {
-                        self.storage
-                            .spaces
-                            .update_operable(id, true)
-                            .expect("storage should never fail");
-                        let iter = self
-                            .storage
-                            .indexes
-                            .by_space_id(id)
-                            .expect("storage should never fail");
-                        for index in iter {
-                            self.storage
-                                .indexes
-                                .update_operable(index.space_id, index.id, true)
-                                .expect("storage should never fail");
-                        }
+                        ddl_meta_space_update_operable(&self.storage, id, true)
+                            .expect("storage shouldn't fail");
                     }
 
                     _ => {
@@ -1159,21 +1133,8 @@ impl NodeImpl {
             }
 
             Ddl::DropSpace { id } => {
-                self.storage
-                    .spaces
-                    .update_operable(id, false)
-                    .expect("storage should never fail");
-                let iter = self
-                    .storage
-                    .indexes
-                    .by_space_id(id)
-                    .expect("storage should never fail");
-                for index in iter {
-                    self.storage
-                        .indexes
-                        .update_operable(index.space_id, index.id, false)
-                        .expect("storage should never fail");
-                }
+                ddl_meta_space_update_operable(&self.storage, id, false)
+                    .expect("storage shouldn't fail");
             }
 
             Ddl::DropIndex { index_id, space_id } => {
-- 
GitLab


From 58b16cb5958c31df3289893e7e419e4514fc341a Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Mon, 5 Jun 2023 10:22:53 +0300
Subject: [PATCH 6/6] fix: ddl abort for create sharded space

---
 src/storage.rs       | 12 ++++++++
 src/traft/node.rs    | 20 ++------------
 test/int/test_ddl.py | 66 ++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 81 insertions(+), 17 deletions(-)

diff --git a/src/storage.rs b/src/storage.rs
index f9bd521c9e..21bc0c4163 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -1693,6 +1693,18 @@ pub fn ddl_meta_space_update_operable(
     Ok(())
 }
 
+/// Deletes the picodata internal metadata for a space with id `space_id`.
+///
+/// This function is called when applying the different ddl operations.
+pub fn ddl_meta_drop_space(storage: &Clusterwide, space_id: SpaceId) -> traft::Result<()> {
+    storage.spaces.delete(space_id)?;
+    let iter = storage.indexes.by_space_id(space_id)?;
+    for index in iter {
+        storage.indexes.delete(index.space_id, index.id)?;
+    }
+    Ok(())
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // ddl
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/src/traft/node.rs b/src/traft/node.rs
index dba0110e3c..738fbb2a78 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -13,6 +13,7 @@ use crate::loop_start;
 use crate::r#loop::FlowControl;
 use crate::rpc;
 use crate::schema::{Distribution, IndexDef, SpaceDef};
+use crate::storage::ddl_meta_drop_space;
 use crate::storage::local_schema_version;
 use crate::storage::SnapshotData;
 use crate::storage::ToEntryIter as _;
@@ -886,21 +887,7 @@ impl NodeImpl {
                     }
 
                     Ddl::DropSpace { id } => {
-                        self.storage
-                            .spaces
-                            .delete(id)
-                            .expect("storage should never fail");
-                        let iter = self
-                            .storage
-                            .indexes
-                            .by_space_id(id)
-                            .expect("storage should never fail");
-                        for index in iter {
-                            self.storage
-                                .indexes
-                                .delete(index.space_id, index.id)
-                                .expect("storage should never fail");
-                        }
+                        ddl_meta_drop_space(&self.storage, id).expect("storage shouldn't fail");
                     }
 
                     _ => {
@@ -944,8 +931,7 @@ impl NodeImpl {
                 // Update pico metadata.
                 match ddl {
                     Ddl::CreateSpace { id, .. } => {
-                        self.storage.indexes.delete(id, 0).expect("storage error");
-                        self.storage.spaces.delete(id).expect("storage error");
+                        ddl_meta_drop_space(&self.storage, id).expect("storage shouldn't fail");
                     }
 
                     Ddl::DropSpace { id } => {
diff --git a/test/int/test_ddl.py b/test/int/test_ddl.py
index 87ae1e3e5e..79c7486e2d 100644
--- a/test/int/test_ddl.py
+++ b/test/int/test_ddl.py
@@ -429,6 +429,72 @@ def test_ddl_create_space_unfinished_from_snapshot(cluster: Cluster):
         assert i.eval("return box.space._pico_space:get(...).operable", space_id)
 
 
+################################################################################
+def test_ddl_create_space_abort(cluster: Cluster):
+    i1, i2, i3 = cluster.deploy(instance_count=3, init_replication_factor=1)
+
+    # Create a conflict to force ddl abort.
+    space_name = "space_name_conflict"
+    i3.eval("box.schema.space.create(...)", space_name)
+
+    # Terminate i3 so that other instances actually partially apply the ddl.
+    i3.terminate()
+
+    # Initiate ddl create space.
+    space_id = 887
+    index_fin = i1.propose_create_space(
+        dict(
+            id=space_id,
+            name=space_name,
+            format=[
+                dict(name="id", type="unsigned", is_nullable=False),
+            ],
+            primary_key=[dict(field="id")],
+            distribution=dict(
+                kind="sharded_implicitly",
+                sharding_key=["id"],
+                sharding_fn="murmur3",
+            ),
+        ),
+        wait_index=False,
+    )
+
+    index_prepare = index_fin - 1
+    i1.raft_wait_index(index_prepare)
+    i2.raft_wait_index(index_prepare)
+
+    def get_index_names(i, space_id):
+        return i.eval(
+            """
+            local space_id = ...
+            local res = box.space._pico_index:select({space_id})
+            for i, t in ipairs(res) do
+                res[i] = t.name
+            end
+            return res
+        """
+        )
+
+    assert i1.call("box.space._space:get", space_id) is not None
+    assert get_index_names(i1, space_id) == ["primary_key", "bucket_id"]
+    assert i2.call("box.space._space:get", space_id) is not None
+    assert get_index_names(i2, space_id) == ["primary_key", "bucket_id"]
+
+    # Wake the instance so that governor finds out there's a conflict
+    # and aborts the ddl op.
+    i3.start()
+    i3.wait_online()
+
+    # Everything was cleaned up.
+    assert i1.call("box.space._space:get", space_id) is None
+    assert i2.call("box.space._space:get", space_id) is None
+    assert i3.call("box.space._space:get", space_id) is None
+
+    assert get_index_names(i1, space_id) == []
+    assert get_index_names(i2, space_id) == []
+    assert get_index_names(i3, space_id) == []
+
+
 ################################################################################
 def test_ddl_create_space_partial_failure(cluster: Cluster):
     # i2 & i3 are for quorum
-- 
GitLab