From 2f253e1e5e190d47745890fc8ce094219ad5a993 Mon Sep 17 00:00:00 2001
From: godzie44 <godzie@yandex.ru>
Date: Wed, 21 Aug 2024 17:24:31 +0300
Subject: [PATCH] fix(plugin): add lock for plugin migrations

Closes: #837
---
 src/plugin/lock.rs                            | 171 ++++++++++++++++++
 src/plugin/migration.rs                       |  16 +-
 src/plugin/mod.rs                             |  21 ++-
 test/conftest.py                              |   1 +
 test/int/test_plugin.py                       |  79 ++++++++
 .../testplug_w_migration_2/0.1.0/author.db    |   8 +
 .../0.1.0/manifest.yaml                       |   9 +
 7 files changed, 302 insertions(+), 3 deletions(-)
 create mode 100644 src/plugin/lock.rs
 create mode 100644 test/testplug/testplug_w_migration_2/0.1.0/author.db
 create mode 100644 test/testplug/testplug_w_migration_2/0.1.0/manifest.yaml

diff --git a/src/plugin/lock.rs b/src/plugin/lock.rs
new file mode 100644
index 0000000000..74ddbe60a6
--- /dev/null
+++ b/src/plugin/lock.rs
@@ -0,0 +1,171 @@
+use crate::cas::Range;
+use crate::info::InstanceInfo;
+use crate::instance::{InstanceId, StateVariant};
+use crate::plugin::migration::Error;
+use crate::plugin::{
+    reenterable_plugin_cas_request, PluginError, PluginOp, PreconditionCheckResult,
+};
+use crate::storage::{ClusterwideTable, PropertyName};
+use crate::traft::node;
+use crate::traft::op::{Dml, Op};
+use crate::util::effective_user_id;
+use crate::{instance, tlog, traft};
+use serde::{Deserialize, Serialize};
+use tarantool::time::Instant;
+
+#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)]
+pub struct PicoPropertyLock {
+    /// The lock is considered to be acquired by this instance.
+    instance_id: instance::InstanceId,
+    /// State incarnation of the instance at the moment it acquired the lock.
+    ///
+    /// This is used to determine if the instance has terminated while holding the lock.
+    incarnation: u64,
+}
+
+impl PicoPropertyLock {
+    pub fn new(instance_id: InstanceId, incarnation: u64) -> Self {
+        Self {
+            instance_id,
+            incarnation,
+        }
+    }
+}
+
+impl From<InstanceInfo> for PicoPropertyLock {
+    fn from(instance_info: InstanceInfo) -> Self {
+        PicoPropertyLock::new(
+            instance_info.instance_id,
+            instance_info.current_state.incarnation,
+        )
+    }
+}
+
+/// Try to acquire a lock.
+///
+/// # Errors
+/// Return [`PluginError::LockAlreadyAcquired`] if any other instance already acquires the lock.
+/// Return error if underlying CAS fails.
+pub fn try_acquire(deadline: Instant) -> crate::plugin::Result<()> {
+    let node = node::global().expect("node must be already initialized");
+    let precondition = || {
+        let my_instance_info = InstanceInfo::try_get(node, None).unwrap();
+
+        if let Some(op) = node.storage.properties.pending_plugin_op()? {
+            let PluginOp::MigrationLock(existed_lock) = op else {
+                return Err(PluginError::LockAlreadyAcquired.into());
+            };
+
+            let info = InstanceInfo::try_get(node, Some(&existed_lock.instance_id)).ok();
+            match info {
+                None => {
+                    tlog!(
+                        Warning,
+                        "Lock by non-existent instance found, acquire new lock"
+                    );
+                }
+                Some(info) if info.current_state.variant != StateVariant::Online => {
+                    tlog!(
+                        Warning,
+                        "Lock by offline or expelled instance found, acquire new lock"
+                    );
+                }
+                Some(info) if info.current_state.incarnation != existed_lock.incarnation => {
+                    tlog!(
+                        Warning,
+                        "Lock by instance with changed incarnation, acquire new lock"
+                    );
+                }
+                _ => {
+                    return Err(PluginError::LockAlreadyAcquired.into());
+                }
+            }
+        }
+
+        let new_lock = PluginOp::MigrationLock(PicoPropertyLock::from(my_instance_info));
+        let dml = Dml::replace(
+            ClusterwideTable::Property,
+            &(&PropertyName::PendingPluginOperation, new_lock),
+            effective_user_id(),
+        )?;
+        let ranges = vec![
+            //  if someone acquires the lock
+            Range::new(ClusterwideTable::Property).eq([PropertyName::PendingPluginOperation]),
+        ];
+
+        Ok(PreconditionCheckResult::DoOp((Op::Dml(dml), ranges)))
+    };
+
+    if let Err(e) = reenterable_plugin_cas_request(node, precondition, deadline) {
+        if matches!(
+            e,
+            traft::error::Error::Plugin(PluginError::LockAlreadyAcquired)
+        ) {
+            return Err(PluginError::LockAlreadyAcquired);
+        }
+
+        return Err(Error::AcquireLock(e.to_string()).into());
+    }
+
+    Ok(())
+}
+
+/// Return `Ok` if lock already acquired by current instance, and it's actual
+/// (this means that the instance incarnation has not changed since the lock was taken).
+///
+/// # Errors
+/// Return [`PluginError::LockGoneUnexpected`] if lock should be already acquired by current instance,
+/// but it's not. Return errors if storage fails.
+pub fn lock_is_acquired_by_us() -> traft::Result<()> {
+    let node = node::global()?;
+    let instance_info = InstanceInfo::try_get(node, None)?;
+    let Some(PluginOp::MigrationLock(lock)) = node.storage.properties.pending_plugin_op()? else {
+        return Err(PluginError::LockGoneUnexpected.into());
+    };
+    if lock != PicoPropertyLock::from(instance_info) {
+        return Err(PluginError::LockGoneUnexpected.into());
+    }
+    Ok(())
+}
+
+/// Release a lock.
+///
+/// # Errors
+///
+/// Return error if lock already released or underlying CAS fails.
+pub fn release(deadline: Instant) -> crate::plugin::Result<()> {
+    let node = node::global().expect("node must be already initialized");
+    let precondition = || {
+        let existed_lock = node.storage.properties.pending_plugin_op()?;
+
+        match existed_lock {
+            None => return Err(PluginError::LockAlreadyReleased.into()),
+            Some(PluginOp::MigrationLock(_)) => {}
+            _ => return Err(PluginError::LockGoneUnexpected.into()),
+        };
+
+        let dml = Dml::delete(
+            ClusterwideTable::Property,
+            &[PropertyName::PendingPluginOperation],
+            effective_user_id(),
+        )?;
+
+        let ranges =
+            vec![Range::new(ClusterwideTable::Property).eq([PropertyName::PendingPluginOperation])];
+
+        Ok(PreconditionCheckResult::DoOp((Op::Dml(dml), ranges)))
+    };
+
+    if let Err(e) = reenterable_plugin_cas_request(node, precondition, deadline) {
+        if matches!(
+            e,
+            traft::error::Error::Plugin(PluginError::LockAlreadyReleased)
+        ) {
+            return Err(PluginError::LockAlreadyReleased);
+        }
+
+        return Err(Error::ReleaseLock(e.to_string()).into());
+    }
+
+    Ok(())
+}
diff --git a/src/plugin/migration.rs b/src/plugin/migration.rs
index 68b589945e..959b87a212 100644
--- a/src/plugin/migration.rs
+++ b/src/plugin/migration.rs
@@ -1,7 +1,7 @@
 use crate::cas;
 use crate::cbus::ENDPOINT_NAME;
-use crate::plugin::reenterable_plugin_cas_request;
 use crate::plugin::PreconditionCheckResult;
+use crate::plugin::{lock, reenterable_plugin_cas_request};
 use crate::plugin::{PluginIdentifier, PLUGIN_DIR};
 use crate::schema::ADMIN_ID;
 use crate::storage::ClusterwideTable;
@@ -44,6 +44,12 @@ pub enum Error {
     #[error("Update migration progress: {0}")]
     UpdateProgress(String),
 
+    #[error("Release migration lock: {0}")]
+    ReleaseLock(String),
+
+    #[error("Acquire migration lock: {0}")]
+    AcquireLock(String),
+
     #[error("inconsistent with previous version migration list, reason: {0}")]
     InconsistentMigrationList(String),
 }
@@ -393,6 +399,8 @@ struct SBroadApplier;
 
 impl SqlApplier for SBroadApplier {
     fn apply(&self, sql: &str, deadline: Option<Instant>) -> traft::Result<()> {
+        // check that lock is still actual
+        lock::lock_is_acquired_by_us()?;
         // Should sbroad accept a timeout parameter?
         if let Some(deadline) = deadline {
             if fiber::clock() > deadline {
@@ -454,6 +462,8 @@ fn down_single_file_with_commit(
     down_single_file(queries, applier);
 
     let make_op = || {
+        lock::lock_is_acquired_by_us()?;
+
         let dml = Dml::delete(
             ClusterwideTable::PluginMigration,
             &[plugin_name, &queries.filename_from_manifest],
@@ -487,6 +497,8 @@ pub fn apply_up_migrations(
     deadline: Instant,
     rollback_timeout: Duration,
 ) -> crate::plugin::Result<()> {
+    crate::error_injection!(block "PLUGIN_MIGRATION_LONG_MIGRATION");
+
     // checking the existence of migration files
     let mut migration_files = vec![];
     for file in migrations {
@@ -550,6 +562,8 @@ pub fn apply_up_migrations(
         };
 
         let make_op = || {
+            lock::lock_is_acquired_by_us()?;
+
             let dml = Dml::replace(
                 ClusterwideTable::PluginMigration,
                 &(
diff --git a/src/plugin/mod.rs b/src/plugin/mod.rs
index bc34802cb1..fae8555307 100644
--- a/src/plugin/mod.rs
+++ b/src/plugin/mod.rs
@@ -1,4 +1,5 @@
 mod ffi;
+pub mod lock;
 pub mod manager;
 pub mod migration;
 pub mod rpc;
@@ -21,6 +22,7 @@ use tarantool::time::Instant;
 
 use crate::cas::Range;
 use crate::info::InstanceInfo;
+use crate::plugin::lock::PicoPropertyLock;
 use crate::plugin::migration::MigrationInfo;
 use crate::plugin::PluginError::{PluginNotFound, RemoveOfEnabledPlugin};
 use crate::schema::{PluginDef, ServiceDef, ServiceRouteItem, ServiceRouteKey, ADMIN_ID};
@@ -96,6 +98,12 @@ pub enum PluginError {
     AmbiguousInstallCandidate,
     #[error("Cannot specify enable candidate (there should be only one installed plugin version)")]
     AmbiguousEnableCandidate,
+    #[error("Migration lock is already acquired")]
+    LockAlreadyAcquired,
+    #[error("Migration lock is unexpected gone away")]
+    LockGoneUnexpected,
+    #[error("Migration lock is already released")]
+    LockAlreadyReleased,
 }
 
 struct DisplaySomeOrDefault<'a>(&'a Option<ErrorInfo>, &'a str);
@@ -671,8 +679,11 @@ pub fn migration_up(
         return Ok(());
     }
 
-    migration::apply_up_migrations(ident, &migration_delta, deadline, rollback_timeout)?;
-    Ok(())
+    lock::try_acquire(deadline)?;
+    let error = migration::apply_up_migrations(ident, &migration_delta, deadline, rollback_timeout);
+    lock::release(deadline)?;
+
+    error.map_err(Error::Plugin)
 }
 
 pub fn migration_down(ident: PluginIdentifier, timeout: Duration) -> traft::Result<()> {
@@ -696,7 +707,10 @@ pub fn migration_down(ident: PluginIdentifier, timeout: Duration) -> traft::Resu
         tlog!(Info, "`DOWN` migrations are up to date");
     }
 
+    lock::try_acquire(deadline)?;
     migration::apply_down_migrations(&ident, &migration_list, deadline);
+    lock::release(deadline)?;
+
     Ok(())
 }
 
@@ -945,6 +959,8 @@ pub fn remove_plugin(ident: &PluginIdentifier, timeout: Duration) -> traft::Resu
             ident: ident.clone(),
         };
         let ranges = vec![
+            // Fail if any plugin migration in process
+            Range::new(ClusterwideTable::Property).eq([PropertyName::PendingPluginOperation]),
             // Fail if someone updates this plugin record
             Range::new(ClusterwideTable::Plugin).eq([&ident.name]),
             // Fail if someone updates any service record of this plugin
@@ -978,6 +994,7 @@ pub enum PluginOp {
         tier: String,
         kind: TopologyUpdateOpKind,
     },
+    MigrationLock(PicoPropertyLock),
 }
 
 #[derive(Serialize, Deserialize, PartialEq, Clone, Debug, Copy)]
diff --git a/test/conftest.py b/test/conftest.py
index cf8e462f0d..ffcce84cba 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -2019,6 +2019,7 @@ def binary_path(cargo_build: None) -> str:
         f"{test_dir}/testplug/testplug_small/0.1.0/libtestplug.{ext}",
         f"{test_dir}/testplug/testplug_small_svc2/0.1.0/libtestplug.{ext}",
         f"{test_dir}/testplug/testplug_w_migration/0.1.0/libtestplug.{ext}",
+        f"{test_dir}/testplug/testplug_w_migration_2/0.1.0/libtestplug.{ext}",
         f"{test_dir}/testplug/testplug_w_migration/0.2.0/libtestplug.{ext}",
         f"{test_dir}/testplug/testplug_w_migration/0.2.0_broken/libtestplug.{ext}",
         f"{test_dir}/testplug/testplug_sdk/0.1.0/libtestplug.{ext}",
diff --git a/test/int/test_plugin.py b/test/int/test_plugin.py
index 50f735df29..b016d13d58 100644
--- a/test/int/test_plugin.py
+++ b/test/int/test_plugin.py
@@ -21,6 +21,7 @@ import requests  # type: ignore
 from conftest import (
     ErrorCode,
 )
+import signal
 
 _3_SEC = 3
 _DEFAULT_CFG = {"foo": True, "bar": 101, "baz": ["one", "two", "three"]}
@@ -35,6 +36,7 @@ _PLUGIN_VERSION_1 = "0.1.0"
 _PLUGIN_VERSION_2 = "0.2.0"
 _DEFAULT_TIER = "default"
 _PLUGIN_WITH_MIGRATION = "testplug_w_migration"
+_PLUGIN_WITH_MIGRATION_2 = "testplug_w_migration_2"
 _PLUGIN_W_SDK = "testplug_sdk"
 _PLUGIN_W_SDK_SERVICES = ["testservice_3"]
 SERVICE_W_RPC = "service_with_rpc_tests"
@@ -1081,6 +1083,83 @@ DROP DATABASE everything;
     assert rows == []
 
 
+def test_migration_lock(cluster: Cluster):
+    i1 = cluster.add_instance(wait_online=True)
+    i2 = cluster.add_instance(wait_online=False, replicaset_id="storage")
+    i3 = cluster.add_instance(wait_online=False, replicaset_id="storage")
+    cluster.wait_online()
+
+    # Decrease auto_offline_timeout so that sentinel notices that the instance
+    # disappeared quicker
+    i1.sql(
+        """ UPDATE "_pico_property" SET "value" = 1 WHERE "key" = 'auto_offline_timeout' """
+    )
+
+    # successfully install v0.1.0
+    i2.call(
+        "pico.install_plugin",
+        _PLUGIN_WITH_MIGRATION_2,
+        "0.1.0",
+        timeout=5,
+    )
+
+    i2.call("pico._inject_error", "PLUGIN_MIGRATION_LONG_MIGRATION", True)
+    i2.eval(
+        """
+            local fiber = require('fiber')
+            function migrate()
+                local res = {pico.migration_up('testplug_w_migration_2', '0.1.0', {timeout = 20})}
+                rawset(_G, "migration_up_result", res)
+            end
+            fiber.create(migrate)
+    """
+    )
+    time.sleep(1)
+
+    with pytest.raises(ReturnError, match="Migration lock is already acquired"):
+        i3.call(
+            "pico.migration_up",
+            _PLUGIN_WITH_MIGRATION_2,
+            "0.1.0",
+            timeout=10,
+        )
+
+    #
+    # i2 suddenly stops responding before it has finished applying migrations
+    #
+    assert i2.process
+    os.killpg(i2.process.pid, signal.SIGSTOP)
+
+    def check_instance_is_offline(peer: Instance, instance_id):
+        instance_info = peer.call(".proc_instance_info", instance_id)
+        assert instance_info["current_state"]["variant"] == "Offline"
+        assert instance_info["target_state"]["variant"] == "Offline"
+
+    # sentinel has noticed that i2 is offline and changed it's state
+    Retriable(timeout=10).call(check_instance_is_offline, i1, i2.instance_id)
+
+    #
+    # i3 can now apply the migrations, because the lock holder is not online
+    #
+    i3.call("pico.migration_up", _PLUGIN_WITH_MIGRATION_2, "0.1.0", timeout=10)
+
+    #
+    # i2 wakes up and attempts to continue with applying the migrations
+    #
+    os.killpg(i2.process.pid, signal.SIGCONT)
+    i2.call("pico._inject_error", "PLUGIN_MIGRATION_LONG_MIGRATION", False)
+
+    def check_migration_up_result(instance: Instance):
+        result = instance.eval("return migration_up_result")
+        assert result is not None
+        return result
+
+    # i2 notices that the lock was forcefully taken away
+    ok, err = Retriable(timeout=10).call(check_migration_up_result, i2)
+    assert ok is None
+    assert err == "Migration lock is already released"
+
+
 # -------------------------- configuration tests -------------------------------------
 
 
diff --git a/test/testplug/testplug_w_migration_2/0.1.0/author.db b/test/testplug/testplug_w_migration_2/0.1.0/author.db
new file mode 100644
index 0000000000..202e9a7b13
--- /dev/null
+++ b/test/testplug/testplug_w_migration_2/0.1.0/author.db
@@ -0,0 +1,8 @@
+-- pico.UP
+
+CREATE TABLE author (id INTEGER NOT NULL, name TEXT NOT NULL, PRIMARY KEY (id))
+USING memtx
+DISTRIBUTED GLOBALLY;
+
+-- pico.DOWN
+DROP TABLE author;
diff --git a/test/testplug/testplug_w_migration_2/0.1.0/manifest.yaml b/test/testplug/testplug_w_migration_2/0.1.0/manifest.yaml
new file mode 100644
index 0000000000..f9dc34374c
--- /dev/null
+++ b/test/testplug/testplug_w_migration_2/0.1.0/manifest.yaml
@@ -0,0 +1,9 @@
+description: plugin for test purposes
+name: testplug_w_migration_2
+version: 0.1.0
+services:
+  - name: testservice_2
+    description: testservice_2 descr
+    default_configuration:
+migration:
+  - author.db
-- 
GitLab