From bc308b562447aa19ada67db20353b6d0506683d2 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Fri, 12 May 2023 11:45:51 +0300
Subject: [PATCH] feat: implement the monotonically increasing
 next_schema_version pico property

---
 src/storage.rs       | 25 +++++++++++++++++++++++++
 src/traft/node.rs    |  3 +++
 test/conftest.py     |  7 +++++++
 test/int/test_ddl.py | 12 ++++--------
 4 files changed, 39 insertions(+), 8 deletions(-)

diff --git a/src/storage.rs b/src/storage.rs
index b781244af7..cf7f7ec70b 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -622,10 +622,35 @@ pub trait TClusterwideSpaceIndex {
     pub enum PropertyName {
         ReplicationFactor = "replication_factor",
         VshardBootstrapped = "vshard_bootstrapped",
+        // TODO: remove this
         DesiredSchemaVersion = "desired_schema_version",
+
+        /// Pending ddl operation which is to be either committed or aborted.
+        ///
+        /// Is only present during the time between the last ddl prepare
+        /// operation and the corresponding ddl commit or abort operation.
         PendingSchemaChange = "pending_schema_change",
+
+        /// Schema version of the pending ddl prepare operation.
+        /// This will be the current schema version if the next entry is a ddl
+        /// commit operation.
+        ///
+        /// Is only present during the time between the last ddl prepare
+        /// operation and the corresponding ddl commit or abort operation.
         PendingSchemaVersion = "pending_schema_version",
+
+        /// Current schema version. Increases with every ddl commit operation.
+        ///
+        /// This is equal to [`pico_schema_version`] during the time between
+        /// the last ddl commit or abort operation and the next ddl prepare
+        /// operation.
         CurrentSchemaVersion = "current_schema_version",
+
+        /// Schema version which should be used for the next ddl prepare
+        /// operation. This increases with every ddl prepare operation in the
+        /// log no matter if it is committed or aborted.
+        /// This guards us from some painfull corner cases.
+        NextSchemaVersion = "next_schema_version",
     }
 }
 
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 167c425279..d1d346975c 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -1001,6 +1001,9 @@ impl NodeImpl {
         self.storage
             .properties
             .put(PropertyName::PendingSchemaVersion, &schema_version)?;
+        self.storage
+            .properties
+            .put(PropertyName::NextSchemaVersion, &(schema_version + 1))?;
 
         Ok(())
     }
diff --git a/test/conftest.py b/test/conftest.py
index ff42f237ad..0b9b64135c 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -524,6 +524,13 @@ class Instance:
         eprint(f"CaS:\n  {predicate=}\n  {dml=}")
         return self.call(".proc_cas", self.cluster_id, predicate, dml)[0]["index"]
 
+    def next_schema_version(self) -> int:
+        t = self.call("box.space._picodata_property:get", "next_schema_version")
+        if t is None:
+            return 1
+
+        return t[1]
+
     def assert_raft_status(self, state, leader_id=None):
         status = self._raft_status()
 
diff --git a/test/int/test_ddl.py b/test/int/test_ddl.py
index f23164cbfb..529141fc5e 100644
--- a/test/int/test_ddl.py
+++ b/test/int/test_ddl.py
@@ -13,7 +13,7 @@ def test_ddl_create_space_bulky(cluster: Cluster):
     # Propose a space creation which will fail
     op = dict(
         kind="ddl_prepare",
-        schema_version=1,
+        schema_version=i1.next_schema_version(),
         ddl=dict(
             kind="create_space",
             id=666,
@@ -40,11 +40,7 @@ def test_ddl_create_space_bulky(cluster: Cluster):
     # Propose a space creation which will succeed
     op = dict(
         kind="ddl_prepare",
-        # This version number must not be equal to the version of the aborted
-        # change, otherwise all of the instances joined after this request was
-        # committed will be blocked during the attempt to apply the previous
-        # DdlAbort
-        schema_version=2,
+        schema_version=i1.next_schema_version(),
         ddl=dict(
             kind="create_space",
             id=666,
@@ -136,7 +132,7 @@ def test_ddl_create_space_partial_failure(cluster: Cluster):
     # Propose a space creation which will fail
     op = dict(
         kind="ddl_prepare",
-        schema_version=1,
+        schema_version=i1.next_schema_version(),
         ddl=dict(
             kind="create_space",
             id=666,
@@ -176,7 +172,7 @@ def test_ddl_from_snapshot(cluster: Cluster):
     # Propose a space creation which will succeed
     op = dict(
         kind="ddl_prepare",
-        schema_version=1,
+        schema_version=i1.next_schema_version(),
         ddl=dict(
             kind="create_space",
             id=666,
-- 
GitLab