From 97367b884822830a42dada223019d4944d182fb2 Mon Sep 17 00:00:00 2001
From: Arseniy Volynets <vol0ncar@yandex.ru>
Date: Tue, 9 Jan 2024 15:19:22 +0300
Subject: [PATCH] feat: change Values distribution to Global

---
 .../arbitrary_expressions_test.lua            |  6 ----
 sbroad-core/src/executor.rs                   | 19 +++++-----
 sbroad-core/src/executor/engine/helpers.rs    | 14 ++++----
 sbroad-core/src/frontend/sql/ir/tests.rs      | 36 +++++++++----------
 sbroad-core/src/ir/explain/tests.rs           |  4 +--
 sbroad-core/src/ir/expression/tests.rs        |  2 +-
 .../src/ir/transformation/redistribution.rs   | 14 +-------
 7 files changed, 37 insertions(+), 58 deletions(-)

diff --git a/sbroad-cartridge/test_app/test/integration/arbitrary_expressions_test.lua b/sbroad-cartridge/test_app/test/integration/arbitrary_expressions_test.lua
index 1890c7ad1e..047766440a 100644
--- a/sbroad-cartridge/test_app/test/integration/arbitrary_expressions_test.lua
+++ b/sbroad-cartridge/test_app/test/integration/arbitrary_expressions_test.lua
@@ -66,12 +66,6 @@ arbitrary_projection.test_arbitrary_invalid = function()
         select cast("id" * 2 > 0 as boolean), cast("id" * 2 > 0 as boolean) as "cast" from "arithmetic_space"
     ]], {} })
     t.assert_str_contains(tostring(err), "rule parsing error")
-
-    -- selection from values without cast
-    local _, err = api:call("sbroad.execute", { [[
-        SELECT "id" FROM "arithmetic_space" WHERE "id" IN (SELECT * FROM (VALUES (1)))
-    ]], {} })
-    t.assert_str_contains(tostring(err), "Sbroad Error: type any not implemented")
 end
 
 arbitrary_projection.test_arbitrary_valid = function()
diff --git a/sbroad-core/src/executor.rs b/sbroad-core/src/executor.rs
index 450a80bcda..2d5379a484 100644
--- a/sbroad-core/src/executor.rs
+++ b/sbroad-core/src/executor.rs
@@ -207,15 +207,9 @@ where
 
                 if let Relational::Motion { policy, .. } = motion {
                     match policy {
-                        // Local segment motions should be treated as a special case.
-                        // 1. If we can materialize it on the router, then we should do it
-                        //    (the child node is `VALUES` of constants).
-                        // 2. Otherwise we should skip it and dispatch the query to the segments
-                        //    (materialization would be done on the segments). Note that we
-                        //    will operate with vtables for LocalSegment motions via calls like
-                        //    `if let Ok(virtual_table) = self.exec_plan.get_motion_vtable(node_id)`
-                        //    in order to define whether virtual table was materialized for values.
-                        MotionPolicy::LocalSegment(_) => {
+                        MotionPolicy::Segment(_) => {
+                            // if child is values, then we can materialize it
+                            // on the router.
                             if let Some(virtual_table) =
                                 materialize_values(&mut self.exec_plan, *motion_id)?
                             {
@@ -226,6 +220,13 @@ where
                                 )?;
                                 self.get_mut_exec_plan().unlink_motion_subtree(*motion_id)?;
                             }
+                        }
+                        // Skip it and dispatch the query to the segments
+                        // (materialization would be done on the segments). Note that we
+                        // will operate with vtables for LocalSegment motions via calls like
+                        // `if let Ok(virtual_table) = self.exec_plan.get_motion_vtable(node_id)`
+                        // in order to define whether virtual table was materialized for values.
+                        MotionPolicy::LocalSegment(_) => {
                             continue;
                         }
                         // Local policy should be skipped and dispatched to the segments:
diff --git a/sbroad-core/src/executor/engine/helpers.rs b/sbroad-core/src/executor/engine/helpers.rs
index 0206c889ac..7e00a93bfc 100644
--- a/sbroad-core/src/executor/engine/helpers.rs
+++ b/sbroad-core/src/executor/engine/helpers.rs
@@ -697,10 +697,12 @@ pub(crate) fn materialize_values(
     // to constants, we have to rewrite this code (need to check that there are
     // no subqueries before node replacement).
     let child_id = plan.get_motion_child(motion_node_id)?;
-    if let Relational::Values { .. } = plan.get_ir_plan().get_relation_node(child_id)? {
-    } else {
+    if !matches!(
+        plan.get_ir_plan().get_relation_node(child_id)?,
+        Relational::Values { .. }
+    ) {
         return Ok(None);
-    };
+    }
     let child_node_ref = plan.get_mut_ir_plan().get_mut_node(child_id)?;
     let child_node = std::mem::replace(child_node_ref, Node::Parameter);
     if let Node::Relational(Relational::Values {
@@ -746,11 +748,7 @@ pub(crate) fn materialize_values(
                 }
                 let data = *data;
                 // Check that all the values are constants.
-                let columns_len = plan
-                    .get_ir_plan()
-                    .get_expression_node(data)?
-                    .get_row_list()?
-                    .len();
+                let columns_len = plan.get_ir_plan().get_row_list(data)?.len();
                 let mut row: VTableTuple = Vec::with_capacity(columns_len);
                 for idx in 0..columns_len {
                     let column_id =
diff --git a/sbroad-core/src/frontend/sql/ir/tests.rs b/sbroad-core/src/frontend/sql/ir/tests.rs
index b2dd66bbcc..ec4b1e4169 100644
--- a/sbroad-core/src/frontend/sql/ir/tests.rs
+++ b/sbroad-core/src/frontend/sql/ir/tests.rs
@@ -271,7 +271,7 @@ fn front_sql10() {
 
     let expected_explain = String::from(
         r#"insert "t" on conflict: fail
-    motion [policy: local segment([ref("COLUMN_1"), ref("COLUMN_2")])]
+    motion [policy: segment([ref("COLUMN_1"), ref("COLUMN_2")])]
         values
             value row (data=ROW(1::unsigned, 2::unsigned, 3::unsigned, 4::unsigned))
 execution options:
@@ -291,7 +291,7 @@ fn front_sql11() {
 
     let expected_explain = String::from(
         r#"insert "t" on conflict: fail
-    motion [policy: local segment([value(NULL), ref("COLUMN_1")])]
+    motion [policy: segment([value(NULL), ref("COLUMN_1")])]
         values
             value row (data=ROW(1::unsigned, 2::unsigned))
 execution options:
@@ -2203,7 +2203,7 @@ fn front_sql_insert_on_conflict() {
     let mut plan = sql_to_optimized_ir(input, vec![]);
     let mut expected_explain = String::from(
         r#"insert "t" on conflict: nothing
-    motion [policy: local segment([ref("COLUMN_1"), ref("COLUMN_2")])]
+    motion [policy: segment([ref("COLUMN_1"), ref("COLUMN_2")])]
         values
             value row (data=ROW(1::unsigned, 1::unsigned, 1::unsigned, 1::unsigned))
 execution options:
@@ -2217,7 +2217,7 @@ vtable_max_rows = 5000
     plan = sql_to_optimized_ir(input, vec![]);
     expected_explain = String::from(
         r#"insert "t" on conflict: replace
-    motion [policy: local segment([ref("COLUMN_1"), ref("COLUMN_2")])]
+    motion [policy: segment([ref("COLUMN_1"), ref("COLUMN_2")])]
         values
             value row (data=ROW(1::unsigned, 1::unsigned, 1::unsigned, 1::unsigned))
 execution options:
@@ -2339,7 +2339,7 @@ fn front_sql_insert_6() {
     let plan = sql_to_optimized_ir(input, vec![]);
     let expected_explain = String::from(
         r#"insert "t" on conflict: fail
-    motion [policy: local segment([ref("COLUMN_5"), ref("COLUMN_6")])]
+    motion [policy: segment([ref("COLUMN_5"), ref("COLUMN_6")])]
         values
             value row (data=ROW(1::unsigned, 2::unsigned))
             value row (data=ROW(1::unsigned, 2::unsigned))
@@ -2394,7 +2394,7 @@ fn front_sql_insert_9() {
     let plan = sql_to_optimized_ir(input, vec![Value::from(1_u64), Value::from(2_u64)]);
     let expected_explain = String::from(
         r#"insert "t" on conflict: fail
-    motion [policy: local segment([ref("COLUMN_1"), ref("COLUMN_2")])]
+    motion [policy: segment([ref("COLUMN_1"), ref("COLUMN_2")])]
         values
             value row (data=ROW(1::unsigned, 2::unsigned))
 execution options:
@@ -2698,12 +2698,11 @@ fn front_sql_not_exists() {
             values
                 value row (data=ROW(1::unsigned))
 subquery $0:
-motion [policy: full]
-            scan
-                projection ("COLUMN_2"::unsigned -> "COLUMN_2")
-                    scan
-                        values
-                            value row (data=ROW(1::unsigned))
+scan
+            projection ("COLUMN_2"::unsigned -> "COLUMN_2")
+                scan
+                    values
+                        value row (data=ROW(1::unsigned))
 execution options:
 sql_vdbe_max_steps = 45000
 vtable_max_rows = 5000
@@ -2761,13 +2760,12 @@ fn front_sql_not_complex_query() {
                     projection (not not ROW("test_space"."id"::unsigned) -> "nnid")
                         scan "test_space"
 subquery $0:
-motion [policy: full]
-            scan
-                projection ("COLUMN_1"::unsigned -> "COLUMN_1")
-                    selection not ROW(true::boolean) = ROW(true::boolean)
-                        scan
-                            values
-                                value row (data=ROW(1::unsigned))
+scan
+            projection ("COLUMN_1"::unsigned -> "COLUMN_1")
+                selection not ROW(true::boolean) = ROW(true::boolean)
+                    scan
+                        values
+                            value row (data=ROW(1::unsigned))
 execution options:
 sql_vdbe_max_steps = 45000
 vtable_max_rows = 5000
diff --git a/sbroad-core/src/ir/explain/tests.rs b/sbroad-core/src/ir/explain/tests.rs
index 672c1f007b..5699c59e39 100644
--- a/sbroad-core/src/ir/explain/tests.rs
+++ b/sbroad-core/src/ir/explain/tests.rs
@@ -345,7 +345,7 @@ fn insert_plan() {
     let mut actual_explain = String::new();
     actual_explain.push_str(
         r#"insert "test_space" on conflict: fail
-    motion [policy: local segment([ref("COLUMN_1")])]
+    motion [policy: segment([ref("COLUMN_1")])]
         values
             value row (data=ROW(1::unsigned, '123'::string))
 execution options:
@@ -369,7 +369,7 @@ fn multiply_insert_plan() {
     let mut actual_explain = String::new();
     actual_explain.push_str(
         r#"insert "test_space" on conflict: fail
-    motion [policy: local segment([ref("COLUMN_5")])]
+    motion [policy: segment([ref("COLUMN_5")])]
         values
             value row (data=ROW(1::unsigned, '123'::string))
             value row (data=ROW(2::unsigned, '456'::string))
diff --git a/sbroad-core/src/ir/expression/tests.rs b/sbroad-core/src/ir/expression/tests.rs
index 7af24c668c..d8ba6da0d8 100644
--- a/sbroad-core/src/ir/expression/tests.rs
+++ b/sbroad-core/src/ir/expression/tests.rs
@@ -4,7 +4,7 @@ use pretty_assertions::assert_eq;
 
 use crate::ir::relation::{Column, SpaceEngine, Table, Type};
 use crate::ir::value::Value;
-use crate::ir::{Plan, SbroadError};
+use crate::ir::Plan;
 
 #[test]
 fn row_duplicate_column_names() {
diff --git a/sbroad-core/src/ir/transformation/redistribution.rs b/sbroad-core/src/ir/transformation/redistribution.rs
index 79baa78f66..c5275486e0 100644
--- a/sbroad-core/src/ir/transformation/redistribution.rs
+++ b/sbroad-core/src/ir/transformation/redistribution.rs
@@ -1718,16 +1718,6 @@ impl Plan {
                 }
             }
         }
-        if let Relational::Values { .. } = self.get_relation_node(child_id)? {
-            if let Distribution::Any = child_dist {
-                map.add_child(
-                    child_id,
-                    MotionPolicy::LocalSegment(motion_key),
-                    Program::default(),
-                );
-                return Ok(map);
-            }
-        }
 
         map.add_child(
             child_id,
@@ -2001,9 +1991,7 @@ impl Plan {
                     self.set_distribution(output)?;
                 }
                 Relational::Values { output, .. } => {
-                    // TODO(ars): replace with Global, when it is fully
-                    // supported.
-                    self.set_dist(output, Distribution::Any)?;
+                    self.set_dist(output, Distribution::Global)?;
                 }
                 Relational::Projection {
                     output: proj_output_id,
-- 
GitLab