From c1843ece17e6dbf44ef43a781d7a17ecc568c174 Mon Sep 17 00:00:00 2001
From: Denis Smirnov <sd@picodata.io>
Date: Mon, 30 Jan 2023 12:51:14 +0700
Subject: [PATCH] fix: do not prepare DML on storage

---
 sbroad-cartridge/src/cartridge/storage.rs | 171 ++++++++++++++--------
 1 file changed, 112 insertions(+), 59 deletions(-)

diff --git a/sbroad-cartridge/src/cartridge/storage.rs b/sbroad-cartridge/src/cartridge/storage.rs
index bad6ce331e..d952c29312 100644
--- a/sbroad-cartridge/src/cartridge/storage.rs
+++ b/sbroad-cartridge/src/cartridge/storage.rs
@@ -1,6 +1,7 @@
 use crate::api::exec_query::protocol::{EncodedOptionalData, OptionalData, RequiredData};
 use crate::cartridge::config::StorageConfiguration;
 use crate::cartridge::update_tracing;
+use sbroad::backend::sql::ir::{PatternWithParams, TmpSpaceMap};
 use sbroad::errors::{Action, Entity, SbroadError};
 use sbroad::executor::bucket::Buckets;
 use sbroad::executor::engine::Configuration;
@@ -185,57 +186,93 @@ impl StorageRuntime {
         &self,
         required: &mut RequiredData,
         raw_optional: &mut Vec<u8>,
+    ) -> Result<Box<dyn Any>, SbroadError> {
+        if required.can_be_cached {
+            return self.execute_cacheable_plan(required, raw_optional);
+        }
+        Self::execute_non_cacheable_plan(required, raw_optional)
+    }
+
+    #[allow(unused_variables)]
+    pub fn execute_non_cacheable_plan(
+        required: &mut RequiredData,
+        raw_optional: &mut Vec<u8>,
     ) -> Result<Box<dyn Any>, SbroadError> {
         let plan_id = required.plan_id.clone();
 
-        // Use all buckets as we don't want to filter any data from the execution plan
-        // (this work has already been done on the coordinator).
-        let buckets = Buckets::All;
+        if required.can_be_cached {
+            return Err(SbroadError::Invalid(
+                Entity::Plan,
+                Some("Expected a plan that can not be cached.".to_string()),
+            ));
+        }
+
+        let (pattern_with_params, tmp_spaces) = compile_encoded_optional(raw_optional)?;
+        debug!(
+            Option::from("execute"),
+            &format!(
+                "Failed to execute the statement: {}",
+                pattern_with_params.pattern
+            ),
+        );
+        let result = if required.query_type == QueryType::DML {
+            write_unprepared(&pattern_with_params.pattern, &pattern_with_params.params)
+        } else {
+            read_unprepared(&pattern_with_params.pattern, &pattern_with_params.params)
+        };
+        for space in tmp_spaces {
+            drop(space);
+        }
+        result
+    }
+
+    #[allow(unused_variables)]
+    pub fn execute_cacheable_plan(
+        &self,
+        required: &mut RequiredData,
+        raw_optional: &mut Vec<u8>,
+    ) -> Result<Box<dyn Any>, SbroadError> {
+        let plan_id = required.plan_id.clone();
+
+        if !required.can_be_cached {
+            return Err(SbroadError::Invalid(
+                Entity::Plan,
+                Some("Expected a plan that can be cached.".to_string()),
+            ));
+        }
 
         // Look for the prepared statement in the cache.
-        if required.can_be_cached {
-            if let Some(stmt) = self
-                .cache
-                .try_borrow_mut()
-                .map_err(|e| {
-                    SbroadError::FailedTo(Action::Borrow, Some(Entity::Cache), format!("{e}"))
-                })?
-                .get(&plan_id)?
-            {
-                let stmt_id = stmt.id()?;
-                // The statement was found in the cache, so we can execute it.
-                debug!(
-                    Option::from("execute plan"),
-                    &format!("Execute prepared statement {stmt:?}"),
-                );
-                let result = match required.query_type {
-                    QueryType::DML => write_prepared(stmt_id, "", &required.parameters),
-                    QueryType::DQL => read_prepared(stmt_id, "", &required.parameters),
-                };
-
-                // If prepared statement is invalid for some reason, fallback to the long pass
-                // and recompile the query.
-                if result.is_ok() {
-                    return result;
-                }
-            }
+        if let Some(stmt) = self
+            .cache
+            .try_borrow_mut()
+            .map_err(|e| {
+                SbroadError::FailedTo(Action::Borrow, Some(Entity::Cache), format!("{e}"))
+            })?
+            .get(&plan_id)?
+        {
+            let stmt_id = stmt.id()?;
+            // The statement was found in the cache, so we can execute it.
             debug!(
                 Option::from("execute plan"),
-                &format!("Failed to find a plan (id {plan_id}) in the cache."),
+                &format!("Execute prepared statement: {stmt:?}"),
             );
+            let result = match required.query_type {
+                QueryType::DML => write_prepared(stmt_id, "", &required.parameters),
+                QueryType::DQL => read_prepared(stmt_id, "", &required.parameters),
+            };
+
+            // If prepared statement is invalid for some reason, fallback to the long pass
+            // and recompile the query.
+            if result.is_ok() {
+                return result;
+            }
         }
+        debug!(
+            Option::from("execute plan"),
+            &format!("Failed to find a plan (id {plan_id}) in the cache."),
+        );
 
-        // Find a statement in the Tarantool's cache or prepare it
-        // (i.e. compile and put into the cache).
-        let data = std::mem::take(raw_optional);
-        let mut optional = OptionalData::try_from(EncodedOptionalData::from(data))?;
-        optional.exec_plan.get_mut_ir_plan().restore_constants()?;
-        let nodes = optional.ordered.to_syntax_data()?;
-        let (pattern_with_params, tmp_spaces) = optional.exec_plan.to_sql(
-            &nodes,
-            &buckets,
-            &uuid::Uuid::new_v4().as_simple().to_string(),
-        )?;
+        let (pattern_with_params, tmp_spaces) = compile_encoded_optional(raw_optional)?;
         let result = match prepare(&pattern_with_params.pattern) {
             Ok(stmt) => {
                 let stmt_id = stmt.id()?;
@@ -247,22 +284,20 @@ impl StorageRuntime {
                         stmt.pattern()?
                     ),
                 );
-                if required.can_be_cached {
-                    self.cache
-                        .try_borrow_mut()
-                        .map_err(|e| {
-                            SbroadError::FailedTo(
-                                Action::Put,
-                                None,
-                                format!("prepared statement {stmt:?} into the cache: {e:?}"),
-                            )
-                        })?
-                        .put(plan_id, stmt)?;
-                }
+                self.cache
+                    .try_borrow_mut()
+                    .map_err(|e| {
+                        SbroadError::FailedTo(
+                            Action::Put,
+                            None,
+                            format!("prepared statement {stmt:?} into the cache: {e:?}"),
+                        )
+                    })?
+                    .put(plan_id, stmt)?;
                 // The statement was found in the cache, so we can execute it.
                 debug!(
                     Option::from("execute plan"),
-                    &format!("Execute prepared statement {stmt_id}"),
+                    &format!("Execute prepared statement: {stmt_id}"),
                 );
                 if required.query_type == QueryType::DML {
                     write_prepared(
@@ -290,12 +325,10 @@ impl StorageRuntime {
                     ),
                 );
                 if required.query_type == QueryType::DML {
-                    return write_unprepared(
-                        &pattern_with_params.pattern,
-                        &pattern_with_params.params,
-                    );
+                    write_unprepared(&pattern_with_params.pattern, &pattern_with_params.params)
+                } else {
+                    read_unprepared(&pattern_with_params.pattern, &pattern_with_params.params)
                 }
-                read_unprepared(&pattern_with_params.pattern, &pattern_with_params.params)
             }
         };
         for space in tmp_spaces {
@@ -305,6 +338,26 @@ impl StorageRuntime {
     }
 }
 
+fn compile_encoded_optional(
+    raw_optional: &mut Vec<u8>,
+) -> Result<(PatternWithParams, TmpSpaceMap), SbroadError> {
+    // Use all buckets as we don't want to filter any data from the execution plan
+    // (this work has already been done on the coordinator).
+    let buckets = Buckets::All;
+
+    // Find a statement in the Tarantool's cache or prepare it
+    // (i.e. compile and put into the cache).
+    let data = std::mem::take(raw_optional);
+    let mut optional = OptionalData::try_from(EncodedOptionalData::from(data))?;
+    optional.exec_plan.get_mut_ir_plan().restore_constants()?;
+    let nodes = optional.ordered.to_syntax_data()?;
+    optional.exec_plan.to_sql(
+        &nodes,
+        &buckets,
+        &uuid::Uuid::new_v4().as_simple().to_string(),
+    )
+}
+
 #[otm_child_span("tarantool.statement.prepare")]
 fn prepare(pattern: &str) -> Result<PreparedStmt, SbroadError> {
     let lua = tarantool::lua_state();
-- 
GitLab