From 36cce00b169cd6bd7161d3a28c5c8c4f068827f3 Mon Sep 17 00:00:00 2001
From: Kaitmazian Maksim <m.kaitmazian@picodata.io>
Date: Wed, 28 Aug 2024 10:49:44 +0300
Subject: [PATCH] feat: prevent concurrent schema changes on master by using a
 global lock

A global lock for schema changes that solves the following issue:
An expensive index creation can cause RPC timeouts, resulting in re-sending of the
same RPC even though the operation is still in progress on the master. The second RPC
attempts to create the same index, but it realizes that the index already exists
even if the operation has not been completed yet. To handle this scenario, a global
lock was added in `proc_apply_schema_change` to prevent concurrent schema changes.
---
 src/rpc/ddl_apply.rs | 28 ++++++++++++++++++++++++----
 src/traft/node.rs    |  6 ++++++
 test/int/test_ddl.py | 32 ++++++++++++++++++++++++++++++++
 3 files changed, 62 insertions(+), 4 deletions(-)

diff --git a/src/rpc/ddl_apply.rs b/src/rpc/ddl_apply.rs
index 7f2cd926dd..5afa4155f7 100644
--- a/src/rpc/ddl_apply.rs
+++ b/src/rpc/ddl_apply.rs
@@ -10,10 +10,25 @@ use crate::traft::error::Error as TraftError;
 use crate::traft::error::ErrorInfo;
 use crate::traft::node;
 use crate::traft::{RaftIndex, RaftTerm};
+use std::rc::Rc;
 use std::time::Duration;
 use tarantool::error::{BoxError, TarantoolErrorCode};
+use tarantool::fiber;
 use tarantool::transaction::{transaction, TransactionError};
 
+// A global lock for schema changes that solves the following issue:
+// An expensive index creation can cause RPC timeouts, resulting in re-sending of the
+// same RPC even though the operation is still in progress on the master. The second RPC
+// attempts to create the same index, but it realizes that the index already exists,
+// even if the operation has not been completed yet. To handle this scenario, a global
+// lock was added for `apply_schema_change` to prevent concurrent schema changes.
+//
+// Note: The issue was discovered in
+// `<https://git.picodata.io/picodata/picodata/picodata/-/issues/748>`.
+thread_local! {
+    static LOCK: Rc<fiber::Mutex<()>> = Rc::new(fiber::Mutex::new(()));
+}
+
 crate::define_rpc_request! {
     /// Forces the target instance to actually apply the pending schema change locally.
     ///
@@ -34,6 +49,12 @@ crate::define_rpc_request! {
 
         let storage = &node.storage;
 
+        // While the schema change is being applied, repeated RPCs will be blocked by this lock.
+        // Once the change is applied and the lock is released, repeated RPC will finish quickly
+        // after checking the schema versions.
+        let lock = LOCK.with(Rc::clone);
+        let _guard = lock.lock();
+
         let pending_schema_version = storage.properties.pending_schema_version()?
             .ok_or_else(|| TraftError::other("pending schema version not found"))?;
         // Already applied.
@@ -52,10 +73,6 @@ crate::define_rpc_request! {
         let ddl = storage.properties.pending_schema_change()?
             .ok_or_else(|| TraftError::other("pending schema change not found"))?;
 
-
-        // TODO: transaction may have already started, if we're in a process of
-        // creating a big index. If governor sends a repeat rpc request to us we
-        // should handle this correctly
         let res = transaction(|| apply_schema_change(storage, &ddl, pending_schema_version, false));
         match res {
             Ok(()) => Ok(Response::Ok),
@@ -126,6 +143,9 @@ pub fn apply_schema_change(
 ) -> Result<(), Error> {
     debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
 
+    // Injection allowing to block the transaction for an indefinite period of time.
+    crate::error_injection!(block "BLOCK_APPLY_SCHEMA_CHANGE_TRANSACTION");
+
     match *ddl {
         Ddl::CreateTable { id, .. } => {
             let abort_reason = ddl_create_space_on_master(storage, id).map_err(Error::Other)?;
diff --git a/src/traft/node.rs b/src/traft/node.rs
index fc4b1c25d8..f1879a665a 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -965,6 +965,12 @@ impl NodeImpl {
                         return SleepAndRetry;
                     } else {
                         // Master applies schema change at this point.
+                        // Note: Unlike RPC handler `proc_apply_schema_change`, there is no need
+                        // for a schema change lock. When instance is catching up to the cluster,
+                        // RPCs will be blocked waiting for the applied index from the request to
+                        // be applied on master *, so no concurrent changes can happen.
+                        //
+                        // * https://git.picodata.io/picodata/picodata/picodata/-/blob/ccba5cf1956e41b31eac8cdfacd0e4344033dda1/src/rpc/ddl_apply.rs#L32
                         let res = rpc::ddl_apply::apply_schema_change(
                             &self.storage,
                             &ddl,
diff --git a/test/int/test_ddl.py b/test/int/test_ddl.py
index c29312b9ce..0dbeeced41 100644
--- a/test/int/test_ddl.py
+++ b/test/int/test_ddl.py
@@ -1528,3 +1528,35 @@ cluster:
             """,
         )
         assert query["row_count"] == 1
+
+
+def test_long_term_transaction_causing_rpc_timeouts(cluster: Cluster):
+    """
+    This test is designed to reproduces the issue described in
+    https://git.picodata.io/picodata/picodata/picodata/-/issues/748
+    """
+    i1, i2, _ = cluster.deploy(instance_count=3)
+
+    ddl = i1.sql("CREATE TABLE t (id INT PRIMARY KEY, data INT, data2 INT)")
+    assert ddl["row_count"] == 1
+
+    # Simulate a long-term transaction by blocking the next schema change for 1.5 seconds.
+    # The RPC timeout is set to 1 second, so this block will trigger an RPC timeout.
+    # After the timeout occurs, another RPC will be re-sent and blocked by the schema change lock.
+    # Once the injection is disabled, the initial RPC will create the index, complete
+    # the transaction and release the lock, allowing subsequent RPC to begin and send an
+    # acknowledgement to the governor.
+    i2.eval(
+        """
+        local fiber = require('fiber')
+        function block_next_apply_schema_change_transaction_for_one_and_a_half_secs()
+            pico._inject_error("BLOCK_APPLY_SCHEMA_CHANGE_TRANSACTION", true)
+            fiber.sleep(1.5)
+            pico._inject_error("BLOCK_APPLY_SCHEMA_CHANGE_TRANSACTION", false)
+        end
+        fiber.create(block_next_apply_schema_change_transaction_for_one_and_a_half_secs)
+        """
+    )
+
+    ddl = i1.sql("CREATE INDEX tdata ON t (data) OPTION (TIMEOUT = 3)")
+    assert ddl["row_count"] == 1
-- 
GitLab