From 58b16cb5958c31df3289893e7e419e4514fc341a Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Mon, 5 Jun 2023 10:22:53 +0300
Subject: [PATCH] fix: ddl abort for create sharded space

---
 src/storage.rs       | 12 ++++++++
 src/traft/node.rs    | 20 ++------------
 test/int/test_ddl.py | 66 ++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 81 insertions(+), 17 deletions(-)

diff --git a/src/storage.rs b/src/storage.rs
index f9bd521c9e..21bc0c4163 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -1693,6 +1693,18 @@ pub fn ddl_meta_space_update_operable(
     Ok(())
 }
 
+/// Deletes the picodata internal metadata for a space with id `space_id`.
+///
+/// This function is called when applying the different ddl operations.
+pub fn ddl_meta_drop_space(storage: &Clusterwide, space_id: SpaceId) -> traft::Result<()> {
+    storage.spaces.delete(space_id)?;
+    let iter = storage.indexes.by_space_id(space_id)?;
+    for index in iter {
+        storage.indexes.delete(index.space_id, index.id)?;
+    }
+    Ok(())
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // ddl
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/src/traft/node.rs b/src/traft/node.rs
index dba0110e3c..738fbb2a78 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -13,6 +13,7 @@ use crate::loop_start;
 use crate::r#loop::FlowControl;
 use crate::rpc;
 use crate::schema::{Distribution, IndexDef, SpaceDef};
+use crate::storage::ddl_meta_drop_space;
 use crate::storage::local_schema_version;
 use crate::storage::SnapshotData;
 use crate::storage::ToEntryIter as _;
@@ -886,21 +887,7 @@ impl NodeImpl {
                     }
 
                     Ddl::DropSpace { id } => {
-                        self.storage
-                            .spaces
-                            .delete(id)
-                            .expect("storage should never fail");
-                        let iter = self
-                            .storage
-                            .indexes
-                            .by_space_id(id)
-                            .expect("storage should never fail");
-                        for index in iter {
-                            self.storage
-                                .indexes
-                                .delete(index.space_id, index.id)
-                                .expect("storage should never fail");
-                        }
+                        ddl_meta_drop_space(&self.storage, id).expect("storage shouldn't fail");
                     }
 
                     _ => {
@@ -944,8 +931,7 @@ impl NodeImpl {
                 // Update pico metadata.
                 match ddl {
                     Ddl::CreateSpace { id, .. } => {
-                        self.storage.indexes.delete(id, 0).expect("storage error");
-                        self.storage.spaces.delete(id).expect("storage error");
+                        ddl_meta_drop_space(&self.storage, id).expect("storage shouldn't fail");
                     }
 
                     Ddl::DropSpace { id } => {
diff --git a/test/int/test_ddl.py b/test/int/test_ddl.py
index 87ae1e3e5e..79c7486e2d 100644
--- a/test/int/test_ddl.py
+++ b/test/int/test_ddl.py
@@ -429,6 +429,72 @@ def test_ddl_create_space_unfinished_from_snapshot(cluster: Cluster):
         assert i.eval("return box.space._pico_space:get(...).operable", space_id)
 
 
+################################################################################
+def test_ddl_create_space_abort(cluster: Cluster):
+    i1, i2, i3 = cluster.deploy(instance_count=3, init_replication_factor=1)
+
+    # Create a conflict to force ddl abort.
+    space_name = "space_name_conflict"
+    i3.eval("box.schema.space.create(...)", space_name)
+
+    # Terminate i3 so that other instances actually partially apply the ddl.
+    i3.terminate()
+
+    # Initiate ddl create space.
+    space_id = 887
+    index_fin = i1.propose_create_space(
+        dict(
+            id=space_id,
+            name=space_name,
+            format=[
+                dict(name="id", type="unsigned", is_nullable=False),
+            ],
+            primary_key=[dict(field="id")],
+            distribution=dict(
+                kind="sharded_implicitly",
+                sharding_key=["id"],
+                sharding_fn="murmur3",
+            ),
+        ),
+        wait_index=False,
+    )
+
+    index_prepare = index_fin - 1
+    i1.raft_wait_index(index_prepare)
+    i2.raft_wait_index(index_prepare)
+
+    def get_index_names(i, space_id):
+        return i.eval(
+            """
+            local space_id = ...
+            local res = box.space._pico_index:select({space_id})
+            for i, t in ipairs(res) do
+                res[i] = t.name
+            end
+            return res
+        """
+        )
+
+    assert i1.call("box.space._space:get", space_id) is not None
+    assert get_index_names(i1, space_id) == ["primary_key", "bucket_id"]
+    assert i2.call("box.space._space:get", space_id) is not None
+    assert get_index_names(i2, space_id) == ["primary_key", "bucket_id"]
+
+    # Wake the instance so that governor finds out there's a conflict
+    # and aborts the ddl op.
+    i3.start()
+    i3.wait_online()
+
+    # Everything was cleaned up.
+    assert i1.call("box.space._space:get", space_id) is None
+    assert i2.call("box.space._space:get", space_id) is None
+    assert i3.call("box.space._space:get", space_id) is None
+
+    assert get_index_names(i1, space_id) == []
+    assert get_index_names(i2, space_id) == []
+    assert get_index_names(i3, space_id) == []
+
+
 ################################################################################
 def test_ddl_create_space_partial_failure(cluster: Cluster):
     # i2 & i3 are for quorum
-- 
GitLab