From ac4fa22c47d0dbaf5141671894d278a23cb2bca3 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Fri, 2 Jun 2023 19:22:45 +0300
Subject: [PATCH] refactor: extract ddl_create_space_on_master function

---
 src/rpc/ddl_apply.rs |  57 +++++------------------
 src/storage.rs       | 106 ++++++++++++++++++++++++++++++++-----------
 2 files changed, 91 insertions(+), 72 deletions(-)

diff --git a/src/rpc/ddl_apply.rs b/src/rpc/ddl_apply.rs
index f874d8e4be..80ee2c8111 100644
--- a/src/rpc/ddl_apply.rs
+++ b/src/rpc/ddl_apply.rs
@@ -1,4 +1,5 @@
 use crate::op::Ddl;
+use crate::storage::ddl_create_space_on_master;
 use crate::storage::Clusterwide;
 use crate::storage::{local_schema_version, set_local_schema_version};
 use crate::tlog;
@@ -9,7 +10,6 @@ use crate::traft::{RaftIndex, RaftTerm};
 use std::time::Duration;
 use tarantool::error::{TarantoolError, TarantoolErrorCode};
 use tarantool::ffi::tarantool as ffi;
-use tarantool::space::{Space, SystemSpace};
 
 crate::define_rpc_request! {
     fn proc_apply_schema_change(req: Request) -> Result<Response> {
@@ -82,54 +82,11 @@ crate::define_rpc_request! {
 // TODO: move this to crate::schema maybe?
 pub fn apply_schema_change(storage: &Clusterwide, ddl: &Ddl, version: u64) -> Result<Response> {
     debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
-    let sys_space = Space::from(SystemSpace::Space);
-    let sys_index = Space::from(SystemSpace::Index);
 
     match *ddl {
         Ddl::CreateSpace { id, .. } => {
-            let pico_space_def = storage
-                .spaces
-                .get(id)?
-                .ok_or_else(|| Error::other(format!("space with id #{id} not found")))?;
-            // TODO: set defaults
-            let tt_space_def = pico_space_def.to_space_metadata()?;
-
-            let pico_pk_def = storage.indexes.get(id, 0)?.ok_or_else(|| {
-                Error::other(format!(
-                    "primary index for space {} not found",
-                    pico_space_def.name
-                ))
-            })?;
-            let tt_pk_def = pico_pk_def.to_index_metadata();
-
-            // For now we just assume that during space creation index with id 1
-            // exists if and only if it is a bucket_id index.
-            let mut tt_bucket_id_def = None;
-            let pico_bucket_id_def = storage.indexes.get(id, 1)?;
-            if let Some(def) = &pico_bucket_id_def {
-                tt_bucket_id_def = Some(def.to_index_metadata());
-            }
-
-            let res = (|| -> tarantool::Result<()> {
-                if tt_pk_def.parts.is_empty() {
-                    return Err(tarantool::set_and_get_error!(
-                        tarantool::error::TarantoolErrorCode::ModifyIndex,
-                        "can't create index '{}' in space '{}': parts list cannot be empty",
-                        tt_pk_def.name,
-                        tt_space_def.name,
-                    )
-                    .into());
-                }
-                sys_space.insert(&tt_space_def)?;
-                sys_index.insert(&tt_pk_def)?;
-                if let Some(def) = tt_bucket_id_def {
-                    sys_index.insert(&def)?;
-                }
-                set_local_schema_version(version)?;
-
-                Ok(())
-            })();
-            if let Err(e) = res {
+            let abort_reason = ddl_create_space_on_master(storage, id)?;
+            if let Some(e) = abort_reason {
                 // We return Ok(error) because currently this is the only
                 // way to report an application level error.
                 return Ok(Response::Abort {
@@ -142,5 +99,13 @@ pub fn apply_schema_change(storage: &Clusterwide, ddl: &Ddl, version: u64) -> Re
         }
     }
 
+    if let Err(e) = set_local_schema_version(version) {
+        // We return Ok(error) because currently this is the only
+        // way to report an application level error.
+        return Ok(Response::Abort {
+            reason: e.to_string(),
+        });
+    }
+
     Ok(Response::Ok)
 }
diff --git a/src/storage.rs b/src/storage.rs
index 2db24dfcb9..bde42a04e5 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -591,9 +591,6 @@ impl Clusterwide {
     }
 
     pub fn apply_ddl_changes_on_replicaset_master(&self) -> traft::Result<()> {
-        let sys_space = Space::from(SystemSpace::Space);
-        let sys_index = Space::from(SystemSpace::Index);
-
         for space_def in self.spaces.iter()? {
             if !space_def.operable {
                 // If it so happens, that we receive an unfinished schema change via snapshot,
@@ -602,29 +599,12 @@ impl Clusterwide {
                 continue;
             }
 
-            let Some(pk_def) = self.indexes.get(space_def.id, 0)? else {
-                crate::warn_or_panic!("a space definition without a primary index arrived via snapshot: {space_def:?}");
-                continue;
-            };
-
-            // For now we just assume that during space creation index with id 1
-            // exists if and only if it is a bucket_id index.
-            let bucket_id_def = self.indexes.get(space_def.id, 1)?;
-
-            // XXX: this logic is duplicated in proc_apply_schema_change, but
-            // the code is so small, it doesn't seem forth it extracting it for
-            // now
-            let tt_space_def = space_def.to_space_metadata()?;
-            let tt_pk_def = pk_def.to_index_metadata();
-            let mut tt_bucket_id_def = None;
-            if let Some(def) = &bucket_id_def {
-                tt_bucket_id_def = Some(def.to_index_metadata());
-            }
-
-            sys_space.replace(&tt_space_def)?;
-            sys_index.replace(&tt_pk_def)?;
-            if let Some(def) = tt_bucket_id_def {
-                sys_index.replace(&def)?;
+            let abort_reason = ddl_create_space_on_master(self, space_def.id)?;
+            if let Some(e) = abort_reason {
+                return Err(Error::other(format!(
+                    "failed to create space {}: {e}",
+                    space_def.id
+                )));
             }
         }
 
@@ -1621,6 +1601,80 @@ impl Indexes {
     }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// ddl
+////////////////////////////////////////////////////////////////////////////////
+
+/// Create tarantool space and any required indexes. Currently it creates a
+/// primary index and a `bucket_id` index if it's a sharded space.
+///
+/// Return values:
+/// * `Ok(None)` in case of success.
+/// * `Ok(Some(abort_reason))` in case of error which should result in a ddl abort.
+/// * `Err(e)` in case of retryable errors.
+///
+// FIXME: this function returns 2 kinds of errors: retryable and non-retryable.
+// Currently this is impelemnted by returning one kind of errors as Err(e) and
+// the other as Ok(Some(e)). This was the simplest solution at the time this
+// function was implemented, as it requires the least amount of boilerplate and
+// error forwarding code. But this signature is not intuitive, so maybe there's
+// room for improvement.
+pub fn ddl_create_space_on_master(
+    storage: &Clusterwide,
+    space_id: SpaceId,
+) -> traft::Result<Option<TntError>> {
+    debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
+    let sys_space = Space::from(SystemSpace::Space);
+    let sys_index = Space::from(SystemSpace::Index);
+
+    let pico_space_def = storage
+        .spaces
+        .get(space_id)?
+        .ok_or_else(|| Error::other(format!("space with id {space_id} not found")))?;
+    // TODO: set defaults
+    let tt_space_def = pico_space_def.to_space_metadata()?;
+
+    let pico_pk_def = storage.indexes.get(space_id, 0)?.ok_or_else(|| {
+        Error::other(format!(
+            "primary index for space {} not found",
+            pico_space_def.name
+        ))
+    })?;
+    let tt_pk_def = pico_pk_def.to_index_metadata();
+
+    // For now we just assume that during space creation index with id 1
+    // exists if and only if it is a bucket_id index.
+    let mut tt_bucket_id_def = None;
+    let pico_bucket_id_def = storage.indexes.get(space_id, 1)?;
+    if let Some(def) = &pico_bucket_id_def {
+        tt_bucket_id_def = Some(def.to_index_metadata());
+    }
+
+    let res = (|| -> tarantool::Result<()> {
+        if tt_pk_def.parts.is_empty() {
+            return Err(tarantool::set_and_get_error!(
+                tarantool::error::TarantoolErrorCode::ModifyIndex,
+                "can't create index '{}' in space '{}': parts list cannot be empty",
+                tt_pk_def.name,
+                tt_space_def.name,
+            )
+            .into());
+        }
+        sys_space.insert(&tt_space_def)?;
+        sys_index.insert(&tt_pk_def)?;
+        if let Some(def) = tt_bucket_id_def {
+            sys_index.insert(&def)?;
+        }
+
+        Ok(())
+    })();
+    Ok(res.err())
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// local schema version
+////////////////////////////////////////////////////////////////////////////////
+
 pub fn local_schema_version() -> tarantool::Result<u64> {
     let space_schema = Space::from(SystemSpace::Schema);
     let tuple = space_schema.get(&["local_schema_version"])?;
-- 
GitLab