From 7d9d08ec95de880ec0dc0aeef515202c90981b14 Mon Sep 17 00:00:00 2001
From: Egor Ivkov <e.o.ivkov@gmail.com>
Date: Wed, 7 Jun 2023 18:06:20 +0300
Subject: [PATCH] feat: manual ddl abort

---
 src/luamod.rs        | 25 ++++++++++++++++--
 src/schema.rs        | 61 ++++++++++++++++++++++++++++++++++++--------
 test/conftest.py     | 22 +++++++++++++++-
 test/int/test_ddl.py | 12 +++++++++
 4 files changed, 106 insertions(+), 14 deletions(-)

diff --git a/src/luamod.rs b/src/luamod.rs
index 362c3b70ff..463acf2768 100644
--- a/src/luamod.rs
+++ b/src/luamod.rs
@@ -692,7 +692,7 @@ pub(crate) fn setup(args: &args::Run) {
             format = ...,         -- table of Field. See pico.help(\"Field\")
             primary_key = ...,    -- table of string
             distribution = ...,   -- string, one of 'global' | 'sharded'
-            timeout_sec = ...,    -- number
+            timeout = ...,    -- number, in seconds
         }
         ========================
 
@@ -701,7 +701,7 @@ pub(crate) fn setup(args: &args::Run) {
         "},
         {
             tlua::function1(|params: CreateSpaceParams| -> traft::Result<RaftIndex> {
-                let timeout = Duration::from_secs_f64(params.timeout_sec);
+                let timeout = Duration::from_secs_f64(params.timeout);
                 let storage = &node::global()?.storage;
                 let params = params.validate(storage)?;
                 // TODO: check space creation and rollback
@@ -713,6 +713,27 @@ pub(crate) fn setup(args: &args::Run) {
             })
         },
     );
+    luamod_set(
+        &l,
+        "abort_ddl",
+        indoc! {"
+        pico.abort_ddl(timeout)
+        ========================
+
+        Aborts a pending DDL operation.
+
+        Returns an index of the corresponding DdlAbort raft entry, or an error if
+        there is no pending DDL operation.
+
+        # Params
+        1. timeout - number, in seconds
+        "},
+        {
+            tlua::function1(|timeout: f64| -> traft::Result<RaftIndex> {
+                schema::abort_ddl(Duration::from_secs_f64(timeout))
+            })
+        },
+    );
     luamod_set_help_only(
         &l,
         "Field",
diff --git a/src/schema.rs b/src/schema.rs
index c0e4593e50..b69bd451c8 100644
--- a/src/schema.rs
+++ b/src/schema.rs
@@ -214,6 +214,8 @@ pub enum DdlError {
     CreateSpace(#[from] CreateSpaceError),
     #[error("ddl operation was aborted")]
     Aborted,
+    #[error("there is no pending ddl operation")]
+    NoPendingDdl,
 }
 
 #[derive(Debug, Error)]
@@ -275,9 +277,11 @@ pub struct CreateSpaceParams {
     by_field: Option<String>,
     sharding_key: Option<Vec<String>>,
     sharding_fn: Option<ShardingFn>,
+    /// Timeout in seconds.
+    ///
     /// Specifying the timeout identifies how long user is ready to wait for ddl to be applied.
     /// But it does not provide guarantees that a ddl will be aborted if wait for commit timeouts.
-    pub timeout_sec: f64,
+    pub timeout: f64,
 }
 
 impl CreateSpaceParams {
@@ -519,6 +523,41 @@ pub fn prepare_ddl(op: Ddl, timeout: Duration) -> traft::Result<RaftIndex> {
     }
 }
 
+/// Aborts a pending DDL operation and waits for abort to be committed localy.
+/// If `timeout` is reached earlier returns an error.
+///
+/// Returns an index of the corresponding DdlAbort raft entry, or an error if
+/// there is no pending DDL operation.
+pub fn abort_ddl(timeout: Duration) -> traft::Result<RaftIndex> {
+    let node = node::global()?;
+    loop {
+        if node.storage.properties.pending_schema_change()?.is_none() {
+            return Err(DdlError::NoPendingDdl.into());
+        }
+        let index = node.get_index();
+        let term = raft::Storage::term(&node.raft_storage, index)?;
+        let predicate = rpc::cas::Predicate {
+            index,
+            term,
+            ranges: vec![
+                rpc::cas::Range::new(ClusterwideSpaceId::Property as _)
+                    .eq((PropertyName::PendingSchemaChange,)),
+                rpc::cas::Range::new(ClusterwideSpaceId::Property as _)
+                    .eq((PropertyName::GlobalSchemaVersion,)),
+                rpc::cas::Range::new(ClusterwideSpaceId::Property as _)
+                    .eq((PropertyName::NextSchemaVersion,)),
+            ],
+        };
+        let (index, term) = compare_and_swap(Op::DdlAbort, predicate)?;
+        node.wait_index(index, timeout)?;
+        if raft::Storage::term(&node.raft_storage, index)? != term {
+            // leader switched - retry
+            continue;
+        }
+        return Ok(index);
+    }
+}
+
 mod tests {
     use tarantool::space::FieldType;
 
@@ -563,7 +602,7 @@ mod tests {
             by_field: None,
             sharding_key: None,
             sharding_fn: None,
-            timeout_sec: 0.0,
+            timeout: 0.0,
         }
         .validate(&storage)
         .unwrap_err();
@@ -581,7 +620,7 @@ mod tests {
             by_field: None,
             sharding_key: None,
             sharding_fn: None,
-            timeout_sec: 0.0,
+            timeout: 0.0,
         }
         .validate(&storage)
         .unwrap_err();
@@ -599,7 +638,7 @@ mod tests {
             by_field: None,
             sharding_key: None,
             sharding_fn: None,
-            timeout_sec: 0.0,
+            timeout: 0.0,
         }
         .validate(&storage)
         .unwrap_err();
@@ -617,7 +656,7 @@ mod tests {
             by_field: None,
             sharding_key: None,
             sharding_fn: None,
-            timeout_sec: 0.0,
+            timeout: 0.0,
         }
         .validate(&storage)
         .unwrap_err();
@@ -635,7 +674,7 @@ mod tests {
             by_field: None,
             sharding_key: Some(vec![field2.name.clone()]),
             sharding_fn: None,
-            timeout_sec: 0.0,
+            timeout: 0.0,
         }
         .validate(&storage)
         .unwrap_err();
@@ -653,7 +692,7 @@ mod tests {
             by_field: None,
             sharding_key: Some(vec![field1.name.clone(), field1.name.clone()]),
             sharding_fn: None,
-            timeout_sec: 0.0,
+            timeout: 0.0,
         }
         .validate(&storage)
         .unwrap_err();
@@ -671,7 +710,7 @@ mod tests {
             by_field: Some(field2.name.clone()),
             sharding_key: None,
             sharding_fn: None,
-            timeout_sec: 0.0,
+            timeout: 0.0,
         }
         .validate(&storage)
         .unwrap_err();
@@ -689,7 +728,7 @@ mod tests {
             by_field: None,
             sharding_key: None,
             sharding_fn: None,
-            timeout_sec: 0.0,
+            timeout: 0.0,
         }
         .validate(&storage)
         .unwrap_err();
@@ -707,7 +746,7 @@ mod tests {
             by_field: Some(field2.name.clone()),
             sharding_key: Some(vec![]),
             sharding_fn: None,
-            timeout_sec: 0.0,
+            timeout: 0.0,
         }
         .validate(&storage)
         .unwrap_err();
@@ -725,7 +764,7 @@ mod tests {
             by_field: Some(field2.name),
             sharding_key: None,
             sharding_fn: None,
-            timeout_sec: 0.0,
+            timeout: 0.0,
         }
         .validate(&storage)
         .unwrap();
diff --git a/test/conftest.py b/test/conftest.py
index f0d20acc9f..d602f41b6c 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -697,10 +697,21 @@ class Instance:
         Works through Lua API in difference to `propose_create_space`,
         which is more low level and directly proposes a raft entry.
         """
-        params["timeout_sec"] = timeout
+        params["timeout"] = timeout
         index = self.call("pico.create_space", params, timeout)
         return index
 
+    def abort_ddl(self, timeout: float = 3.0) -> int:
+        """
+        Aborts a pending DDL operation and waits for abort to be committed localy.
+        If `timeout` is reached earlier returns an error.
+
+        Returns an index of the corresponding DdlAbort raft entry, or an error if
+        there is no pending DDL operation.
+        """
+        index = self.call("pico.abort_ddl", timeout)
+        return index
+
     def propose_create_space(
         self, space_def: Dict[str, Any], wait_index: bool = True, timeout: int = 3
     ) -> int:
@@ -1040,6 +1051,15 @@ class Cluster:
             if instance.process is not None:
                 instance.raft_wait_index(index)
 
+    def abort_ddl(self, timeout: float = 3.0):
+        """
+        Aborts a pending ddl. Waits for all peers to be aware of it.
+        """
+        index = self.instances[0].abort_ddl(timeout)
+        for instance in self.instances:
+            if instance.process is not None:
+                instance.raft_wait_index(index)
+
     def cas(
         self,
         dml_kind: Literal["insert", "replace", "delete"],
diff --git a/test/int/test_ddl.py b/test/int/test_ddl.py
index 27a5ca0317..8c58e794d3 100644
--- a/test/int/test_ddl.py
+++ b/test/int/test_ddl.py
@@ -2,6 +2,18 @@ import pytest
 from conftest import Cluster, ReturnError
 
 
+def test_ddl_abort(cluster: Cluster):
+    cluster.deploy(instance_count=2)
+
+    with pytest.raises(ReturnError) as e1:
+        cluster.abort_ddl()
+    assert e1.value.args == (
+        "ddl failed: there is no pending ddl operation",
+    )
+
+    # TODO: test manual abort when we have long-running ddls
+
+
 def test_ddl_create_space_lua(cluster: Cluster):
     i1, i2 = cluster.deploy(instance_count=2)
 
-- 
GitLab