From 0c36f8f9aaddca5e45eaa2944c14c703b3a3c3c5 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Tue, 13 Aug 2024 02:18:56 +0300
Subject: [PATCH] fix: make plugin operations properly reenterable

---
 src/plugin/migration.rs |  55 ++++---
 src/plugin/mod.rs       | 353 ++++++++++++++++++++++------------------
 src/sql.rs              |   2 +-
 test/int/test_plugin.py |   6 -
 4 files changed, 230 insertions(+), 186 deletions(-)

diff --git a/src/plugin/migration.rs b/src/plugin/migration.rs
index ae53ff4547..b216788db2 100644
--- a/src/plugin/migration.rs
+++ b/src/plugin/migration.rs
@@ -1,5 +1,7 @@
 use crate::cbus::ENDPOINT_NAME;
-use crate::plugin::{do_plugin_cas, PluginIdentifier, PLUGIN_DIR};
+use crate::plugin::reenterable_plugin_cas_request;
+use crate::plugin::PreconditionCheckResult;
+use crate::plugin::{PluginIdentifier, PLUGIN_DIR};
 use crate::schema::ADMIN_ID;
 use crate::storage::ClusterwideTable;
 use crate::traft::node;
@@ -446,17 +448,21 @@ fn down_single_file_with_commit(
     applier: &impl SqlApplier,
     deadline: Instant,
 ) {
+    let node = node::global().expect("node must be already initialized");
+
     down_single_file(queries, applier);
 
-    let node = node::global().expect("node must be already initialized");
-    let dml = Dml::delete(
-        ClusterwideTable::PluginMigration,
-        &[plugin_name, &queries.filename_from_manifest],
-        ADMIN_ID,
-    )
-    .expect("encoding should not fail");
+    let make_op = || {
+        let dml = Dml::delete(
+            ClusterwideTable::PluginMigration,
+            &[plugin_name, &queries.filename_from_manifest],
+            ADMIN_ID,
+        )?;
+        Ok(PreconditionCheckResult::DoOp(Op::Dml(dml)))
+    };
+
     tlog!(Debug, "updating global storage with DOWN migration");
-    if let Err(e) = do_plugin_cas(node, Op::Dml(dml), vec![], None, deadline) {
+    if let Err(e) = reenterable_plugin_cas_request(node, make_op, vec![], deadline) {
         tlog!(
             Debug,
             "failed: updating global storage with regular DOWN migration progress: {e}"
@@ -554,20 +560,31 @@ pub fn apply_up_migrations(
             }
         };
 
-        let dml = Dml::replace(
-            ClusterwideTable::PluginMigration,
-            &(
-                &plugin_ident.name,
-                &migration.filename_from_manifest,
-                &format!("{:x}", hash),
-            ),
-            ADMIN_ID,
-        )?;
+        let make_op = || {
+            let dml = Dml::replace(
+                ClusterwideTable::PluginMigration,
+                &(
+                    &plugin_ident.name,
+                    &migration.filename_from_manifest,
+                    &format!("{:x}", hash),
+                ),
+                ADMIN_ID,
+            )?;
+            Ok(PreconditionCheckResult::DoOp(Op::Dml(dml)))
+        };
+
         #[rustfmt::skip]
         tlog!(Debug, "updating global storage with migrations progress {num}/{migrations_count}");
-        if let Err(e) = do_plugin_cas(node, Op::Dml(dml), vec![], None, deadline) {
+        // FIXME: currently it possible that migrations will be initiated from
+        // 2 different instances simultaneously which can break some invariants.
+        // What we should do is to introduce a global lock in _pico_property
+        // such that the client first checks if the lock is acquired, then does
+        // a CaS request to acquire the lock and only after that starts doing
+        // the migrations, and releases the lock at the end.
+        if let Err(e) = reenterable_plugin_cas_request(node, make_op, vec![], deadline) {
             #[rustfmt::skip]
             tlog!(Error, "failed: updating global storage with migrations progress: {e}");
+
             handle_err(&seen_queries);
             return Err(Error::UpdateProgress(e.to_string()).into());
         }
diff --git a/src/plugin/mod.rs b/src/plugin/mod.rs
index e49eb5a027..99bd884505 100644
--- a/src/plugin/mod.rs
+++ b/src/plugin/mod.rs
@@ -24,13 +24,14 @@ use crate::plugin::migration::MigrationInfo;
 use crate::plugin::PluginError::{PluginNotFound, RemoveOfEnabledPlugin};
 use crate::schema::{PluginDef, ServiceDef, ServiceRouteItem, ServiceRouteKey, ADMIN_ID};
 use crate::storage::{ClusterwideTable, PropertyName};
+use crate::traft::error::Error;
 use crate::traft::node::Node;
 use crate::traft::op::PluginRaftOp;
 use crate::traft::op::{Dml, Op};
 use crate::traft::{node, RaftIndex};
 use crate::unwrap_ok_or;
 use crate::util::effective_user_id;
-use crate::{cas, error_injection, tlog, traft};
+use crate::{cas, tlog, traft};
 
 const DEFAULT_PLUGIN_DIR: &'static str = "/usr/share/picodata/";
 
@@ -261,6 +262,11 @@ impl Manifest {
         }
     }
 
+    #[inline(always)]
+    pub fn plugin_identifier(&self) -> PluginIdentifier {
+        PluginIdentifier::new(self.name.clone(), self.version.clone())
+    }
+
     /// Return plugin service definitions built from manifest.
     pub fn service_defs(&self) -> Vec<ServiceDef> {
         self.services
@@ -465,32 +471,39 @@ pub fn remove_routes(keys: &[ServiceRouteKey], timeout: Duration) -> traft::Resu
     do_routing_table_cas(ops, ranges, timeout)
 }
 
-/// Perform clusterwide CAS operation related to plugin system.
-///
-/// # Arguments
+////////////////////////////////////////////////////////////////////////////////
+// reenterable_plugin_cas_request
+////////////////////////////////////////////////////////////////////////////////
+
+/// Performs a reenterable plugin change request. Goes into a loop where on each
+/// iteration first the preconditions for the operation are checked and the
+/// [`Op`] is constructed. Then the CaS request is sent to the leader. The loop
+/// continues until either the request is successfully accepted, the deadline is
+/// exceeded or a non-retriable error is detected.
 ///
-/// * `node`: instance node
-/// * `op`: CAS operation
-/// * `ranges`: CAS ranges
-/// * `try_again_condition`: callback, if true - then perform CAS later
-/// * `deadline`: deadline of whole operation
-fn do_plugin_cas(
+/// This algorithm must be equivalent to [`crate::sql::reenterable_schema_change_request`].
+/// Also please don't refactor it to extract common code using generics or
+/// function pointers, this will only make it harder to understand.
+fn reenterable_plugin_cas_request(
     node: &Node,
-    op: Op,
+    check_operation_preconditions_and_make_op_for_cas: impl Fn()
+        -> Result<PreconditionCheckResult, Error>,
     ranges: Vec<Range>,
-    try_again_condition: Option<fn(&Node) -> traft::Result<bool>>,
     deadline: Instant,
 ) -> traft::Result<RaftIndex> {
     loop {
         let index = node.read_index(deadline.duration_since(Instant::now_fiber()))?;
-        if let Some(try_again_condition) = try_again_condition {
-            if try_again_condition(node)? {
+
+        let res = check_operation_preconditions_and_make_op_for_cas()?;
+        use PreconditionCheckResult::*;
+        let op = match res {
+            DoOp(op) => op,
+            WaitIndexAndRetry => {
                 node.wait_index(index + 1, deadline.duration_since(Instant::now_fiber()))?;
                 continue;
             }
-        }
-
-        // FIXME: preconditions & operation must be recomputed on each retry
+            AlreadyApplied => return Ok(index),
+        };
 
         let term = raft::Storage::term(&node.raft_storage, index)?;
         let predicate = cas::Predicate {
@@ -523,6 +536,12 @@ fn do_plugin_cas(
     }
 }
 
+enum PreconditionCheckResult {
+    AlreadyApplied,
+    WaitIndexAndRetry,
+    DoOp(Op),
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // External plugin interface
 ////////////////////////////////////////////////////////////////////////////////
@@ -538,28 +557,39 @@ pub fn install_plugin(
     let deadline = fiber::clock().saturating_add(timeout);
     let node = node::global()?;
 
-    let plugin_already_exist = node.storage.plugin.contains(&ident)?;
-    match (if_not_exists, plugin_already_exist) {
-        (true, true) => return Ok(()),
-        (false, true) => return Err(PluginError::AlreadyExist(ident).into()),
-        (_, _) => {}
-    }
-
     let manifest = Manifest::load(&ident)?;
-    let op = PluginOp::InstallPlugin { manifest };
-    let dml = Dml::replace(
-        ClusterwideTable::Property,
-        &(&PropertyName::PendingPluginOperation, &op),
-        effective_user_id(),
-    )?;
-
-    let mut index = do_plugin_cas(
-        node,
-        Op::Dml(dml),
-        vec![Range::new(ClusterwideTable::Property).eq([PropertyName::PendingPluginOperation])],
-        Some(|node| Ok(node.storage.properties.pending_plugin_op()?.is_some())),
-        deadline,
-    )?;
+
+    let check_and_make_op = || {
+        if node.storage.properties.pending_plugin_op()?.is_some() {
+            return Ok(PreconditionCheckResult::WaitIndexAndRetry);
+        }
+
+        let ident = manifest.plugin_identifier();
+        let plugin_already_exists = node.storage.plugin.contains(&ident)?;
+        if plugin_already_exists {
+            if if_not_exists {
+                return Ok(PreconditionCheckResult::AlreadyApplied);
+            } else {
+                return Err(PluginError::AlreadyExist(ident).into());
+            }
+        }
+
+        let op = PluginOp::InstallPlugin {
+            manifest: manifest.clone(),
+        };
+        let dml = Dml::replace(
+            ClusterwideTable::Property,
+            &(&PropertyName::PendingPluginOperation, &op),
+            effective_user_id(),
+        )?;
+        Ok(PreconditionCheckResult::DoOp(Op::Dml(dml)))
+    };
+
+    let ranges = vec![
+        // Fail if someone proposes another plugin operation
+        Range::new(ClusterwideTable::Property).eq([PropertyName::PendingPluginOperation]),
+    ];
+    let mut index = reenterable_plugin_cas_request(node, check_and_make_op, ranges, deadline)?;
 
     while node.storage.properties.pending_plugin_op()?.is_some() {
         index = node.wait_index(index + 1, deadline.duration_since(Instant::now_fiber()))?;
@@ -680,23 +710,31 @@ pub fn enable_plugin(
     timeout: Duration,
 ) -> traft::Result<()> {
     let deadline = Instant::now_fiber().saturating_add(timeout);
-
     let node = node::global()?;
 
-    // FIXME: this must be done in a retry loop within reenterable_plugin_change_request
-    let services = node.storage.service.get_by_plugin(&plugin)?;
-    let op = PluginOp::EnablePlugin {
-        plugin: plugin.clone(),
-        // FIXME: we shouldn't need to send this list, it's already available on
-        // the governor, what is going on?
-        services,
-        timeout: on_start_timeout,
+    let check_and_make_op = || {
+        if node.storage.properties.pending_plugin_op()?.is_some() {
+            return Ok(PreconditionCheckResult::WaitIndexAndRetry);
+        }
+
+        // TODO: check if plugin already enabled
+
+        let services = node.storage.service.get_by_plugin(plugin)?;
+
+        let op = PluginOp::EnablePlugin {
+            plugin: plugin.clone(),
+            // FIXME: we shouldn't need to send this list, it's already available on
+            // the governor, what is going on?
+            services,
+            timeout: on_start_timeout,
+        };
+        let dml = Dml::replace(
+            ClusterwideTable::Property,
+            &(&PropertyName::PendingPluginOperation, &op),
+            effective_user_id(),
+        )?;
+        Ok(PreconditionCheckResult::DoOp(Op::Dml(dml)))
     };
-    let dml = Dml::replace(
-        ClusterwideTable::Property,
-        &(&PropertyName::PendingPluginOperation, &op),
-        effective_user_id(),
-    )?;
 
     let ranges = vec![
         // Fail if someone proposes another plugin operation
@@ -704,14 +742,7 @@ pub fn enable_plugin(
         // Fail if someone updates this plugin record
         Range::new(ClusterwideTable::Plugin).eq([&plugin.name]),
     ];
-
-    let mut index = do_plugin_cas(
-        node,
-        Op::Dml(dml),
-        ranges,
-        Some(|node| Ok(node.storage.properties.pending_plugin_op()?.is_some())),
-        deadline,
-    )?;
+    let mut index = reenterable_plugin_cas_request(node, check_and_make_op, ranges, deadline)?;
 
     while node.storage.properties.pending_plugin_op()?.is_some() {
         index = node.wait_index(index + 1, deadline.duration_since(Instant::now_fiber()))?;
@@ -738,8 +769,8 @@ pub fn update_plugin_service_configuration(
     timeout: Duration,
 ) -> traft::Result<()> {
     let deadline = Instant::now_fiber().saturating_add(timeout);
-
     let node = node::global()?;
+
     node.plugin_manager
         .handle_event_sync(PluginEvent::BeforeServiceConfigurationUpdated {
             ident,
@@ -748,20 +779,22 @@ pub fn update_plugin_service_configuration(
         })?;
 
     let new_cfg: rmpv::Value = rmp_serde::from_slice(new_cfg_raw).expect("out of memory");
-    let op = PluginRaftOp::UpdatePluginConfig {
-        ident: ident.clone(),
-        service_name: service_name.to_string(),
-        config: new_cfg,
+
+    let check_and_make_op = || {
+        let op = PluginRaftOp::UpdatePluginConfig {
+            ident: ident.clone(),
+            service_name: service_name.to_string(),
+            config: new_cfg.clone(),
+        };
+        Ok(PreconditionCheckResult::DoOp(Op::Plugin(op)))
     };
 
-    do_plugin_cas(
-        node,
-        Op::Plugin(op),
-        vec![Range::new(ClusterwideTable::Service).eq((&ident.name, service_name, &ident.version))],
-        None,
-        deadline,
-    )
-    .map(|_| ())
+    let ranges = vec![
+        // Fail if someone updates this service record
+        Range::new(ClusterwideTable::Service).eq((&ident.name, service_name, &ident.version)),
+    ];
+    reenterable_plugin_cas_request(node, check_and_make_op, ranges, deadline)?;
+    Ok(())
 }
 
 /// Disable plugin:
@@ -771,25 +804,28 @@ pub fn update_plugin_service_configuration(
 pub fn disable_plugin(ident: &PluginIdentifier, timeout: Duration) -> traft::Result<()> {
     let deadline = Instant::now_fiber().saturating_add(timeout);
     let node = node::global()?;
-    let op = PluginRaftOp::DisablePlugin {
-        ident: ident.clone(),
-    };
 
-    // it is ok to return error here based on local state,
-    // we expect that in small count of cases
-    // when "plugin does not exist in local state but exist on leader"
-    // user will retry disable manually
-    if !node.storage.plugin.contains(ident)? {
-        return Err(PluginNotFound(ident.clone()).into());
-    }
+    let check_and_make_op = || {
+        if node.storage.properties.pending_plugin_op()?.is_some() {
+            return Ok(PreconditionCheckResult::WaitIndexAndRetry);
+        }
+
+        // TODO: support if_exists option
+        if !node.storage.plugin.contains(ident)? {
+            return Err(PluginNotFound(ident.clone()).into());
+        }
+
+        let op = PluginRaftOp::DisablePlugin {
+            ident: ident.clone(),
+        };
+        Ok(PreconditionCheckResult::DoOp(Op::Plugin(op)))
+    };
 
-    let mut index = do_plugin_cas(
-        node,
-        Op::Plugin(op),
-        vec![Range::new(ClusterwideTable::Plugin).eq([&ident.name])],
-        Some(|node| Ok(node.storage.properties.pending_plugin_op()?.is_some())),
-        deadline,
-    )?;
+    let ranges = vec![
+        // Fail if someone updates this plugin record
+        Range::new(ClusterwideTable::Plugin).eq([&ident.name]),
+    ];
+    let mut index = reenterable_plugin_cas_request(node, check_and_make_op, ranges, deadline)?;
 
     while node.storage.properties.pending_plugin_op()?.is_some() {
         index = node.wait_index(index + 1, deadline.duration_since(Instant::now_fiber()))?;
@@ -804,61 +840,59 @@ pub fn disable_plugin(ident: &PluginIdentifier, timeout: Duration) -> traft::Res
 ///
 /// * `ident`: identity of plugin to remove
 /// * `timeout`: operation timeout
-pub fn remove_plugin(
-    ident: &PluginIdentifier,
-    timeout: Duration,
-) -> traft::Result<()> {
+pub fn remove_plugin(ident: &PluginIdentifier, timeout: Duration) -> traft::Result<()> {
     let deadline = Instant::now_fiber().saturating_add(timeout);
-
     let node = node::global()?;
-    let Some(plugin) = node.storage.plugin.get(ident)? else {
-        // TODO: support if_exists option
-        #[rustfmt::skip]
-        return Err(traft::error::Error::other(format!("no such plugin `{ident}`")));
-    };
 
-    // we check this condition on any instance, this will allow
-    // to return an error in most situations, but there are still
-    // situations when instance is a follower and has not yet received up-to-date
-    // information from the leader - in this case,
-    // the error will not be returned to client and raft op
-    // must be applied on instances correctly (op should ignore removing if
-    // plugin exists and enabled)
-    if plugin.enabled && !error_injection::is_enabled("PLUGIN_EXIST_AND_ENABLED") {
-        return Err(RemoveOfEnabledPlugin.into());
-    }
+    let check_and_make_op = || {
+        if node.storage.properties.pending_plugin_op()?.is_some() {
+            return Ok(PreconditionCheckResult::WaitIndexAndRetry);
+        }
 
-    let migration_list = node
-        .storage
-        .plugin_migration
-        .get_files_by_plugin(&ident.name)?
-        .into_iter()
-        .map(|rec| rec.migration_file)
-        .collect::<Vec<_>>();
+        let Some(plugin) = node.storage.plugin.get(ident)? else {
+            // TODO: support if_exists option
+            #[rustfmt::skip]
+            return Err(traft::error::Error::other(format!("no such plugin `{ident}`")));
+        };
 
-    if !migration_list.is_empty() {
-        #[rustfmt::skip]
-        return Err(traft::error::Error::other("attempt to remove plugin with applied `UP` migrations"));
-    }
+        if plugin.enabled {
+            return Err(RemoveOfEnabledPlugin.into());
+        }
+
+        let migration_list = node
+            .storage
+            .plugin_migration
+            .get_files_by_plugin(&ident.name)?
+            .into_iter()
+            .map(|rec| rec.migration_file)
+            .collect::<Vec<_>>();
+
+        if !migration_list.is_empty() {
+            #[rustfmt::skip]
+            return Err(traft::error::Error::other("attempt to remove plugin with applied `UP` migrations"));
+        }
 
-    let op = PluginRaftOp::RemovePlugin {
-        ident: ident.clone(),
+        let op = PluginRaftOp::RemovePlugin {
+            ident: ident.clone(),
+        };
+        Ok(PreconditionCheckResult::DoOp(Op::Plugin(op)))
     };
 
-    do_plugin_cas(
-        node,
-        Op::Plugin(op),
-        vec![
-            Range::new(ClusterwideTable::Plugin).eq([&ident.name]),
-            Range::new(ClusterwideTable::Service).eq([&ident.name]),
-        ],
-        None,
-        deadline,
-    )?;
+    let ranges = vec![
+        // Fail if someone updates this plugin record
+        Range::new(ClusterwideTable::Plugin).eq([&ident.name]),
+        // Fail if someone updates any service record of this plugin
+        Range::new(ClusterwideTable::Service).eq([&ident.name]),
+    ];
+    reenterable_plugin_cas_request(node, check_and_make_op, ranges, deadline)?;
 
     Ok(())
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// PluginOp
+////////////////////////////////////////////////////////////////////////////////
+
 #[derive(Serialize, Deserialize, PartialEq, Clone, Debug)]
 pub enum PluginOp {
     InstallPlugin {
@@ -922,18 +956,31 @@ fn update_tier(upd_op: TopologyUpdateOp, timeout: Duration) -> traft::Result<()>
     let deadline = Instant::now_fiber().saturating_add(timeout);
     let node = node::global()?;
 
-    let mb_service = node
-        .storage
-        .service
-        .get(upd_op.plugin_identity(), upd_op.service_name())?;
-
-    if mb_service.is_none() {
-        return Err(PluginError::ServiceNotFound(
-            upd_op.service_name().to_string(),
-            upd_op.plugin_identity().clone(),
-        )
-        .into());
-    }
+    let check_and_make_op = || {
+        if node.storage.properties.pending_plugin_op()?.is_some() {
+            return Ok(PreconditionCheckResult::WaitIndexAndRetry);
+        }
+
+        let service = node
+            .storage
+            .service
+            .get(upd_op.plugin_identity(), upd_op.service_name())?;
+        if service.is_none() {
+            return Err(PluginError::ServiceNotFound(
+                upd_op.service_name().to_string(),
+                upd_op.plugin_identity().clone(),
+            )
+            .into());
+        }
+
+        let op = PluginOp::UpdateTopology(upd_op.clone());
+        let dml = Dml::replace(
+            ClusterwideTable::Property,
+            &(&PropertyName::PendingPluginOperation, &op),
+            effective_user_id(),
+        )?;
+        Ok(PreconditionCheckResult::DoOp(Op::Dml(dml)))
+    };
 
     let ident = upd_op.plugin_identity();
     let ranges = vec![
@@ -948,21 +995,7 @@ fn update_tier(upd_op: TopologyUpdateOp, timeout: Duration) -> traft::Result<()>
         // Fail if someone proposes another plugin operation
         Range::new(ClusterwideTable::Property).eq([PropertyName::PendingPluginOperation]),
     ];
-
-    let op = PluginOp::UpdateTopology(upd_op.clone());
-    let dml = Dml::replace(
-        ClusterwideTable::Property,
-        &(&PropertyName::PendingPluginOperation, &op),
-        effective_user_id(),
-    )?;
-
-    let mut index = do_plugin_cas(
-        node,
-        Op::Dml(dml),
-        ranges,
-        Some(|node| Ok(node.storage.properties.pending_plugin_op()?.is_some())),
-        deadline,
-    )?;
+    let mut index = reenterable_plugin_cas_request(node, check_and_make_op, ranges, deadline)?;
 
     while node.storage.properties.pending_plugin_op()?.is_some() {
         index = node.wait_index(index + 1, deadline.duration_since(Instant::now_fiber()))?;
diff --git a/src/sql.rs b/src/sql.rs
index e81b0c8619..f43beda35e 100644
--- a/src/sql.rs
+++ b/src/sql.rs
@@ -1238,7 +1238,7 @@ fn ddl_ir_node_to_op_or_result(
     }
 }
 
-fn reenterable_schema_change_request(
+pub(crate) fn reenterable_schema_change_request(
     node: &TraftNode,
     ir_node: IrNode,
 ) -> traft::Result<ConsumerResult> {
diff --git a/test/int/test_plugin.py b/test/int/test_plugin.py
index c926b466c6..6e24f75459 100644
--- a/test/int/test_plugin.py
+++ b/test/int/test_plugin.py
@@ -505,12 +505,6 @@ def test_plugin_remove(cluster: Cluster):
         i1.call("pico.remove_plugin", _PLUGIN, _PLUGIN_VERSION_1)
     plugin_ref.assert_synced()
 
-    i1.call("pico._inject_error", "PLUGIN_EXIST_AND_ENABLED", True)
-    # same, but error not returned to a client
-    i1.call("pico.remove_plugin", _PLUGIN, _PLUGIN_VERSION_1)
-    plugin_ref.assert_synced()
-    i1.call("pico._inject_error", "PLUGIN_EXIST_AND_ENABLED", False)
-
     # check default behaviour
     i1.call("pico.disable_plugin", _PLUGIN, _PLUGIN_VERSION_1)
     plugin_ref = plugin_ref.enable(False).set_topology({i1: [], i2: []})
-- 
GitLab