From fbe57a719215bc389a5e9bed995fd73105d06c1b Mon Sep 17 00:00:00 2001
From: Denis Smirnov <sd@picodata.io>
Date: Tue, 7 Mar 2023 20:30:03 +0700
Subject: [PATCH] fix: MR review

---
 sbroad-core/src/backend/sql/space.rs          |  6 +--
 sbroad-core/src/executor/result.rs            |  3 +-
 sbroad-core/src/executor/tests.rs             | 22 +++++-----
 sbroad-core/src/frontend/sql/ir/tests.rs      |  8 ++--
 sbroad-core/src/ir/explain/tests.rs           |  6 +--
 sbroad-core/src/ir/expression.rs              | 42 +++++++++++++++++++
 .../transformation/redistribution/insert.rs   | 39 ++++++++---------
 sbroad-core/src/ir/value.rs                   | 21 +++++++++-
 8 files changed, 103 insertions(+), 44 deletions(-)

diff --git a/sbroad-core/src/backend/sql/space.rs b/sbroad-core/src/backend/sql/space.rs
index a41e6fea00..65180fe2dc 100644
--- a/sbroad-core/src/backend/sql/space.rs
+++ b/sbroad-core/src/backend/sql/space.rs
@@ -96,9 +96,9 @@ impl TmpSpace {
                     ));
                 }
             }
-            for (idx, values) in vtable.get_tuples_with_buckets(buckets).iter().enumerate() {
-                let mut extended_value: Vec<EncodedValue> = Vec::with_capacity(values.len() + 1);
-                for (v, c) in values.iter().zip(vtable.get_columns().iter()) {
+            for (idx, tuples) in vtable.get_tuples_with_buckets(buckets).iter().enumerate() {
+                let mut extended_value: Vec<EncodedValue> = Vec::with_capacity(tuples.len() + 1);
+                for (v, c) in tuples.iter().zip(vtable.get_columns().iter()) {
                     let casted_value = v.cast(&c.r#type)?;
                     extended_value.push(EncodedValue::from(casted_value));
                 }
diff --git a/sbroad-core/src/executor/result.rs b/sbroad-core/src/executor/result.rs
index c3df20d812..6898d417d4 100644
--- a/sbroad-core/src/executor/result.rs
+++ b/sbroad-core/src/executor/result.rs
@@ -194,8 +194,7 @@ impl Plan {
     /// - If relational iterator fails to return a correct node.
     pub fn subtree_contains_values(&self, top_id: usize) -> Result<bool, SbroadError> {
         let mut rel_tree = PostOrder::with_capacity(|node| self.nodes.rel_iter(node), REL_CAPACITY);
-        let nodes: Vec<usize> = rel_tree.iter(top_id).map(|(_, id)| id).collect();
-        for node_id in nodes {
+        for (_, node_id) in rel_tree.iter(top_id) {
             let rel = self.get_relation_node(node_id)?;
             if let Relational::Values { .. } = rel {
                 return Ok(true);
diff --git a/sbroad-core/src/executor/tests.rs b/sbroad-core/src/executor/tests.rs
index a24028264e..05dbe1c56f 100644
--- a/sbroad-core/src/executor/tests.rs
+++ b/sbroad-core/src/executor/tests.rs
@@ -946,7 +946,7 @@ fn insert1_test() {
                     r#"SELECT COL_0, bucket_id (coalesce (CAST (? as string), ?) || coalesce (CAST (COL_0 as string), ?)) FROM"#,
                     r#"(SELECT CAST ("t"."a" as unsigned) as COL_0 FROM ((SELECT "a" FROM "TMP_test_142") as "t"))"#,
                 ),
-                vec![Value::Null, Value::String("".into()), Value::String("".into())],
+                vec![Value::Null, Value::String("NULL".into()), Value::String("NULL".into())],
             ))),
         ],
         vec![
@@ -958,7 +958,7 @@ fn insert1_test() {
                     r#"SELECT COL_0, bucket_id (coalesce (CAST (? as string), ?) || coalesce (CAST (COL_0 as string), ?)) FROM"#,
                     r#"(SELECT CAST ("t"."a" as unsigned) as COL_0 FROM ((SELECT "a" FROM "TMP_test_142") as "t"))"#,
                 ),
-                vec![Value::Null, Value::String("".into()), Value::String("".into())],
+                vec![Value::Null, Value::String("NULL".into()), Value::String("NULL".into())],
             ))),
         ],
     ]);
@@ -995,7 +995,7 @@ fn insert2_test() {
                 r#"(SELECT CAST ("t"."a" as unsigned) as COL_0, CAST ("t"."b" as unsigned) as COL_1 FROM"#,
                 r#"(SELECT "t"."a", "t"."b" FROM "t" WHERE ("t"."a") = (?) and ("t"."b") = (?)))"#,
             ),
-            vec![Value::from(""), Value::from(""), Value::from(1_u64), Value::from(2_u64)],
+            vec![Value::from("NULL"), Value::from("NULL"), Value::from(1_u64), Value::from(2_u64)],
         ))),
     ]]);
     assert_eq!(expected, result);
@@ -1064,7 +1064,7 @@ fn insert3_test() {
                     r#"(SELECT CAST ("t"."a" as unsigned) as COL_0, CAST ("t"."b" as unsigned) as COL_1 FROM"#,
                     r#"((SELECT "a","b" FROM "TMP_test_155") as "t"))"#,
                 ),
-                vec![Value::from(""), Value::from("")],
+                vec![Value::from("NULL"), Value::from("NULL")],
             ))),
         ],
         vec![
@@ -1077,7 +1077,7 @@ fn insert3_test() {
                     r#"(SELECT CAST ("t"."a" as unsigned) as COL_0, CAST ("t"."b" as unsigned) as COL_1 FROM"#,
                     r#"((SELECT "a","b" FROM "TMP_test_155") as "t"))"#,
                 ),
-                vec![Value::from(""), Value::from("")],
+                vec![Value::from("NULL"), Value::from("NULL")],
             ))),
         ],
     ]);
@@ -1115,7 +1115,7 @@ fn insert4_test() {
                 r#"(SELECT CAST ("t"."b" as unsigned) as COL_0, CAST ("t"."a" as unsigned) as COL_1 FROM"#,
                 r#"(SELECT "t"."b", "t"."a" FROM "t" WHERE ("t"."a") = (?) and ("t"."b") = (?)))"#,
             ),
-            vec![Value::from(""), Value::from(""), Value::from(1_u64), Value::from(2_u64)],
+            vec![Value::from("NULL"), Value::from("NULL"), Value::from(1_u64), Value::from(2_u64)],
         ))),
     ]]);
     assert_eq!(expected, result);
@@ -1179,7 +1179,7 @@ fn insert5_test() {
                 r#"(SELECT CAST ("COL_1" as unsigned) as COL_0, CAST ("COL_2" as unsigned) as COL_1 FROM"#,
                 r#"((SELECT "COL_1","COL_2" FROM "TMP_test_125") as "t"))"#,
             ),
-            vec![Value::from(""), Value::from("")],
+            vec![Value::from("NULL"), Value::from("NULL")],
         ))),
     ]]);
     assert_eq!(expected, result);
@@ -1247,7 +1247,7 @@ fn insert6_test() {
                     r#"(SELECT CAST (COLUMN_5 as unsigned) as COL_0, CAST (COLUMN_6 as unsigned) as COL_1 FROM"#,
                     r#"((SELECT "COLUMN_5","COLUMN_6" FROM "TMP_test_94") as "t"))"#,
                 ),
-                vec![Value::from(""), Value::from("")],
+                vec![Value::from("NULL"), Value::from("NULL")],
             ))),
         ],
         vec![
@@ -1260,7 +1260,7 @@ fn insert6_test() {
                     r#"(SELECT CAST (COLUMN_5 as unsigned) as COL_0, CAST (COLUMN_6 as unsigned) as COL_1 FROM"#,
                     r#"((SELECT "COLUMN_5","COLUMN_6" FROM "TMP_test_94") as "t"))"#,
                 ),
-                vec![Value::from(""), Value::from("")],
+                vec![Value::from("NULL"), Value::from("NULL")],
             ))),
         ],
     ]);
@@ -1347,7 +1347,7 @@ fn insert8_test() {
                     r#"(SELECT CAST ("hash_single_testing"."identification_number" as int) as COL_0, CAST ("hash_single_testing"."product_code" as string) as COL_1,"#,
                     r#"CAST ("hash_single_testing"."product_units" as bool) as COL_2, CAST ("hash_single_testing"."sys_op" as unsigned) as COL_3 FROM"#,
                     r#"((SELECT "identification_number","product_code","product_units","sys_op" FROM "TMP_test_114") as "hash_single_testing"))"#,
-                    ), vec![Value::from(""), Value::from("")],
+                    ), vec![Value::from("NULL"), Value::from("NULL")],
                 )
             )
         ),
@@ -1414,7 +1414,7 @@ fn insert9_test() {
                 r#"SELECT COL_0, COL_1, bucket_id (coalesce (CAST (COL_0 as string), ?) || coalesce (CAST (COL_1 as string), ?)) FROM"#,
                 r#"(SELECT CAST (COLUMN_1 as unsigned) as COL_0, CAST (COLUMN_2 as unsigned) as COL_1 FROM ((SELECT "COLUMN_1","COLUMN_2" FROM "TMP_test_82") as "t"))"#,
             ),
-            vec![Value::from(""), Value::from("")],
+            vec![Value::from("NULL"), Value::from("NULL")],
         ))),
     ]]);
     assert_eq!(expected, result);
diff --git a/sbroad-core/src/frontend/sql/ir/tests.rs b/sbroad-core/src/frontend/sql/ir/tests.rs
index 8cdc945f2f..206eb42cb4 100644
--- a/sbroad-core/src/frontend/sql/ir/tests.rs
+++ b/sbroad-core/src/frontend/sql/ir/tests.rs
@@ -232,7 +232,7 @@ fn front_sql10() {
 
     let expected_explain = String::from(
         r#"insert "t"
-    projection (COL_0 -> COL_0, COL_1 -> COL_1, COL_2 -> COL_2, COL_3 -> COL_3, bucket_id((coalesce(('', COL_0::string)) || coalesce(('', COL_1::string)))))
+    projection (COL_0 -> COL_0, COL_1 -> COL_1, COL_2 -> COL_2, COL_3 -> COL_3, bucket_id((coalesce(('NULL', COL_0::string)) || coalesce(('NULL', COL_1::string)))))
         scan
             projection (COLUMN_1::unsigned -> COL_0, COLUMN_2::unsigned -> COL_1, COLUMN_3::unsigned -> COL_2, COLUMN_4::unsigned -> COL_3)
                 scan
@@ -253,7 +253,7 @@ fn front_sql11() {
 
     let expected_explain = String::from(
         r#"insert "t"
-    projection (COL_0 -> COL_0, COL_1 -> COL_1, bucket_id((coalesce(('', COL_0::string)) || coalesce(('', NULL::string)))))
+    projection (COL_0 -> COL_0, COL_1 -> COL_1, bucket_id((coalesce(('NULL', COL_0::string)) || coalesce(('NULL', NULL::string)))))
         scan
             projection (COLUMN_1::unsigned -> COL_0, COLUMN_2::unsigned -> COL_1)
                 scan
@@ -274,7 +274,7 @@ fn front_sql14() {
 
     let expected_explain = String::from(
         r#"insert "t"
-    projection (COL_0 -> COL_0, COL_1 -> COL_1, bucket_id((coalesce(('', COL_0::string)) || coalesce(('', NULL::string)))))
+    projection (COL_0 -> COL_0, COL_1 -> COL_1, bucket_id((coalesce(('NULL', COL_0::string)) || coalesce(('NULL', NULL::string)))))
         scan
             projection ("t"."b"::unsigned -> COL_0, "t"."d"::unsigned -> COL_1)
                 scan
@@ -513,7 +513,7 @@ fn front_sql_groupby_insert() {
 
     let expected_explain = String::from(
         r#"insert "t"
-    projection (COL_0 -> COL_0, COL_1 -> COL_1, bucket_id((coalesce(('', COL_0::string)) || coalesce(('', NULL::string)))))
+    projection (COL_0 -> COL_0, COL_1 -> COL_1, bucket_id((coalesce(('NULL', COL_0::string)) || coalesce(('NULL', NULL::string)))))
         scan
             projection ("b"::unsigned -> COL_0, "d"::unsigned -> COL_1)
                 scan
diff --git a/sbroad-core/src/ir/explain/tests.rs b/sbroad-core/src/ir/explain/tests.rs
index 158468a78d..82332d22d6 100644
--- a/sbroad-core/src/ir/explain/tests.rs
+++ b/sbroad-core/src/ir/explain/tests.rs
@@ -315,7 +315,7 @@ fn insert_plan() {
     let mut actual_explain = String::new();
     actual_explain.push_str(
         r#"insert "test_space"
-    projection (COL_0 -> COL_0, COL_1 -> COL_1, bucket_id((coalesce(('', COL_0::string)))))
+    projection (COL_0 -> COL_0, COL_1 -> COL_1, bucket_id((coalesce(('NULL', COL_0::string)))))
         scan
             projection (COLUMN_1::unsigned -> COL_0, COLUMN_2::string -> COL_1)
                 scan
@@ -340,7 +340,7 @@ fn multiply_insert_plan() {
     let mut actual_explain = String::new();
     actual_explain.push_str(
         r#"insert "test_space"
-    projection (COL_0 -> COL_0, COL_1 -> COL_1, bucket_id((coalesce(('', COL_0::string)))))
+    projection (COL_0 -> COL_0, COL_1 -> COL_1, bucket_id((coalesce(('NULL', COL_0::string)))))
         scan
             projection (COLUMN_5::unsigned -> COL_0, COLUMN_6::string -> COL_1)
                 scan
@@ -368,7 +368,7 @@ SELECT "identification_number", "product_code" FROM "hash_testing""#;
     let mut actual_explain = String::new();
     actual_explain.push_str(
         r#"insert "test_space"
-    projection (COL_0 -> COL_0, COL_1 -> COL_1, bucket_id((coalesce(('', COL_0::string)))))
+    projection (COL_0 -> COL_0, COL_1 -> COL_1, bucket_id((coalesce(('NULL', COL_0::string)))))
         scan
             projection ("hash_testing"."identification_number"::unsigned -> COL_0, "hash_testing"."product_code"::string -> COL_1)
                 scan
diff --git a/sbroad-core/src/ir/expression.rs b/sbroad-core/src/ir/expression.rs
index 9dd0a037b6..0765317fbc 100644
--- a/sbroad-core/src/ir/expression.rs
+++ b/sbroad-core/src/ir/expression.rs
@@ -212,6 +212,20 @@ impl Expression {
         }
     }
 
+    /// Get a mutable reference to the row children list.
+    ///
+    /// # Errors
+    /// - node isn't `Row`
+    pub fn get_row_list_mut(&mut self) -> Result<&mut Vec<usize>, SbroadError> {
+        match self {
+            Expression::Row { ref mut list, .. } => Ok(list),
+            _ => Err(SbroadError::Invalid(
+                Entity::Expression,
+                Some("node isn't Row type".into()),
+            )),
+        }
+    }
+
     /// Gets alias node name.
     ///
     /// # Errors
@@ -270,6 +284,13 @@ impl Expression {
             }
         }
     }
+
+    /// Flushes parent in the reference node.
+    pub fn flush_parent_in_reference(&mut self) {
+        if let Expression::Reference { parent, .. } = self {
+            *parent = None;
+        }
+    }
 }
 
 impl Nodes {
@@ -1051,6 +1072,27 @@ impl Plan {
         }
         Ok(())
     }
+
+    /// Flush parent to `None` for all references in the expression subtree of the current node.
+    ///
+    /// # Errors
+    /// - node is invalid
+    /// - node is not an expression
+    pub fn flush_parent_in_subtree(&mut self, node_id: usize) -> Result<(), SbroadError> {
+        let mut references: Vec<usize> = Vec::new();
+        let mut subtree =
+            PostOrder::with_capacity(|node| self.nodes.expr_iter(node, false), EXPR_CAPACITY);
+        for (_, id) in subtree.iter(node_id) {
+            if let Node::Expression(Expression::Reference { .. }) = self.get_node(id)? {
+                references.push(id);
+            }
+        }
+        for id in references {
+            let node = self.get_mut_expression_node(id)?;
+            node.flush_parent_in_reference();
+        }
+        Ok(())
+    }
 }
 
 #[cfg(test)]
diff --git a/sbroad-core/src/ir/transformation/redistribution/insert.rs b/sbroad-core/src/ir/transformation/redistribution/insert.rs
index 519a5856c4..03b25c8fe9 100644
--- a/sbroad-core/src/ir/transformation/redistribution/insert.rs
+++ b/sbroad-core/src/ir/transformation/redistribution/insert.rs
@@ -1,4 +1,7 @@
+use std::mem::take;
+
 use crate::errors::{Entity, SbroadError};
+use crate::executor::hash::ToHashString;
 use crate::ir::expression::cast::Type as CastType;
 use crate::ir::expression::Expression;
 use crate::ir::function::{Behavior, Function};
@@ -109,19 +112,13 @@ impl Plan {
         ))
     }
 
-    pub(crate) fn set_insert_columns(
+    pub(crate) fn insert_columns_mut(
         &mut self,
         insert_id: usize,
-        columns: Vec<usize>,
-    ) -> Result<(), SbroadError> {
-        let insert = self.get_mut_relation_node(insert_id).unwrap();
-        if let Relational::Insert {
-            columns: old_columns,
-            ..
-        } = insert
-        {
-            *old_columns = columns;
-            return Ok(());
+    ) -> Result<&mut Vec<usize>, SbroadError> {
+        let insert = self.get_mut_relation_node(insert_id)?;
+        if let Relational::Insert { columns, .. } = insert {
+            return Ok(columns);
         }
         Err(SbroadError::Invalid(
             Entity::Node,
@@ -220,7 +217,7 @@ impl Plan {
     /// insert into t1 (a, bucket_id)
     /// select
     ///   c,
-    ///   bucket_id(coalesce(cast(NULL as string), '') || coalesce(cast(c as string), ''))
+    ///   bucket_id(coalesce(cast(NULL as string), 'NULL') || coalesce(cast(c as string), 'NULL'))
     /// from (select c from t2)
     /// ```
     pub(crate) fn replace_insert_child_with_bucket_id_sq(
@@ -236,10 +233,14 @@ impl Plan {
 
         // Create a new row for the bucket_id projection.
         let proj_output_id = self.clone_expr_subtree(sq_output_id)?;
-        let mut new_row = self.get_expression_node(proj_output_id)?.clone_row_list()?;
+        self.flush_parent_in_subtree(proj_output_id)?;
+        let mut new_row = take(
+            self.get_mut_expression_node(proj_output_id)?
+                .get_row_list_mut()?,
+        );
 
         // Create the string concatenation node:
-        // coalesce(cast(NULL as string), '') || coalesce(cast(c as string), '')
+        // coalesce(cast(NULL as string), 'NULL') || coalesce(cast(c as string), 'NULL')
         let mut columns_to_concat: Vec<usize> = Vec::with_capacity(child_motion_key.targets.len());
         for target in &child_motion_key.targets {
             let alias_id = match target {
@@ -258,10 +259,10 @@ impl Plan {
                 alias_id
             };
             let cast_id = self.add_cast(col_id, CastType::String)?;
-            let empty_string_id = self.add_const(Value::String(String::new()));
+            let null_string_id = self.add_const(Value::String(Value::Null.to_hash_string()));
             let fn_coalesce = Function::new("coalesce".to_string(), Behavior::Stable);
             let coalesce_id: usize =
-                self.add_stable_function(&fn_coalesce, vec![cast_id, empty_string_id])?;
+                self.add_stable_function(&fn_coalesce, vec![cast_id, null_string_id])?;
             columns_to_concat.push(coalesce_id);
         }
         let concat_id = if let Some((first, others)) = columns_to_concat.split_first() {
@@ -283,10 +284,10 @@ impl Plan {
         let bucket_id_id: usize = self.add_stable_function(&fn_bucket_id, vec![concat_id])?;
 
         // Push the bucket_id column to the end of the INSERT columns' row.
-        let mut columns = self.insert_columns(insert_id)?.to_vec();
         let table = self.insert_table(insert_id)?;
-        columns.push(table.get_bucket_id_position()?);
-        self.set_insert_columns(insert_id, columns)?;
+        let bucket_id_pos = table.get_bucket_id_position()?;
+        let columns = self.insert_columns_mut(insert_id)?;
+        columns.push(bucket_id_pos);
 
         // Push the bucket_id column to the end of projection row.
         new_row.push(bucket_id_id);
diff --git a/sbroad-core/src/ir/value.rs b/sbroad-core/src/ir/value.rs
index 0b8cc17643..9d98fa787d 100644
--- a/sbroad-core/src/ir/value.rs
+++ b/sbroad-core/src/ir/value.rs
@@ -317,6 +317,11 @@ impl Value {
         }
     }
 
+    /// Cast a value to a different type.
+    ///
+    /// # Errors
+    /// - the value cannot be cast to the given type.
+    #[allow(clippy::too_many_lines)]
     pub fn cast(&self, column_type: &Type) -> Result<Value, SbroadError> {
         match column_type {
             Type::Array => match self {
@@ -384,7 +389,13 @@ impl Value {
                     .map_err(|e| {
                         SbroadError::FailedTo(Action::Serialize, Some(Entity::Value), e.to_string())
                     }),
-                Value::Unsigned(v) => Ok(Value::Integer(*v as i64)),
+                Value::Unsigned(v) => Ok(Value::Integer(i64::try_from(*v).map_err(|e| {
+                    SbroadError::FailedTo(
+                        Action::Serialize,
+                        Some(Entity::Value),
+                        format!("u64 {v} into i64: {e}"),
+                    )
+                })?)),
                 Value::Null => Ok(Value::Null),
                 _ => Err(SbroadError::FailedTo(
                     Action::Serialize,
@@ -422,7 +433,13 @@ impl Value {
             },
             Type::Unsigned => match self {
                 Value::Unsigned(_) => Ok(self.clone()),
-                Value::Integer(v) => Ok(Value::Unsigned(*v as u64)),
+                Value::Integer(v) => Ok(Value::Unsigned(u64::try_from(*v).map_err(|e| {
+                    SbroadError::FailedTo(
+                        Action::Serialize,
+                        Some(Entity::Value),
+                        format!("i64 {v} into u64: {e}"),
+                    )
+                })?)),
                 Value::Decimal(v) => Ok(Value::Unsigned(v.to_u64().ok_or_else(|| {
                     SbroadError::FailedTo(
                         Action::Serialize,
-- 
GitLab