From 1453bdf43f9de887d744ed800f12dd5337620a67 Mon Sep 17 00:00:00 2001
From: Arseniy Volynets <a.volynets@picodata.io>
Date: Fri, 7 Jul 2023 09:45:35 +0000
Subject: [PATCH] feat: support DISTINCT in projection

---
 .../test/integration/groupby_test.lua         |  82 ++++++++++++++
 sbroad-core/src/backend/sql/tree/tests.rs     |   8 +-
 sbroad-core/src/frontend/sql.rs               |  36 +++++--
 sbroad-core/src/frontend/sql/ir/tests.rs      | 101 ++++++++++++++++++
 sbroad-core/src/frontend/sql/query.pest       |   2 +-
 sbroad-core/src/ir.rs                         |  21 ++++
 sbroad-core/src/ir/distribution/tests.rs      |   2 +-
 sbroad-core/src/ir/expression/tests.rs        |   2 +-
 sbroad-core/src/ir/operator.rs                |  14 ++-
 sbroad-core/src/ir/operator/tests.rs          |   8 +-
 .../transformation/redistribution/groupby.rs  |  57 ++++++++--
 .../ir/transformation/redistribution/tests.rs |  10 +-
 .../redistribution/tests/segment.rs           |   2 +-
 sbroad-core/src/ir/tree/tests.rs              |   4 +-
 .../sql/tree/arbitrary_projection_plan.yaml   |   1 +
 .../sql/tree/arithmetic_projection_plan.yaml  |   1 +
 .../sql/tree/arithmetic_selection_plan.yaml   |   1 +
 .../backend/sql/tree/sql_order_selection.yaml |   1 +
 .../ir/distribution/shrink_dist_key_1.yaml    |   1 +
 .../ir/distribution/shrink_dist_key_2.yaml    |   1 +
 .../ir/distribution/shuffle_dist_key.yaml     |   1 +
 .../artifactory/ir/operator/projection.yaml   |   1 +
 .../ir/operator/selection_with_sub_query.yaml |   1 +
 .../full_motion_less_for_sub_query.yaml       |   1 +
 ...otion_non_segment_outer_for_sub_query.yaml |   1 +
 .../redistribution/local_sub_query.yaml       |   1 +
 .../redistribution/multiple_sub_queries.yaml  |   2 +
 .../segment_motion_for_sub_query.yaml         |   1 +
 28 files changed, 328 insertions(+), 36 deletions(-)

diff --git a/sbroad-cartridge/test_app/test/integration/groupby_test.lua b/sbroad-cartridge/test_app/test/integration/groupby_test.lua
index e5a08d686e..7cd2b1ce06 100644
--- a/sbroad-cartridge/test_app/test/integration/groupby_test.lua
+++ b/sbroad-cartridge/test_app/test/integration/groupby_test.lua
@@ -1395,6 +1395,88 @@ groupby_queries.test_aggr_distinct_without_groupby = function()
     })
 end
 
+groupby_queries.test_select_distinct = function()
+    local api = cluster:server("api-1").net_box
+
+
+    local distinct_resp, err = api:call("sbroad.execute", {
+        [[
+        SELECT distinct "a"*2 from "arithmetic_space"
+        ]], {}
+    })
+    t.assert_equals(err, nil)
+    local groupby_resp, err = api:call("sbroad.execute", {
+        [[
+        SELECT "a"*2 from "arithmetic_space"
+        group by "a"*2
+        ]], {}
+    })
+    t.assert_equals(err, nil)
+    t.assert_items_equals(distinct_resp.rows, groupby_resp.rows)
+end
+
+groupby_queries.test_select_distinct2 = function()
+    local api = cluster:server("api-1").net_box
+
+
+    local r, err = api:call("sbroad.execute", {
+        [[
+        SELECT distinct * from
+        (select "e", "f" from "arithmetic_space")
+        ]], {}
+    })
+    t.assert_equals(err, nil)
+    t.assert_equals(r.metadata, {
+        { name = "e", type = "integer" },
+        { name = "f", type = "integer" },
+    })
+    t.assert_items_equals(r.rows, {
+        {2, 2}
+    })
+end
+
+groupby_queries.test_select_distinct3 = function()
+    local api = cluster:server("api-1").net_box
+
+
+    local r, err = api:call("sbroad.execute", {
+        [[
+        SELECT distinct sum("e") from
+        (select "e" from "arithmetic_space")
+        ]], {}
+    })
+    t.assert_equals(err, nil)
+    t.assert_equals(r.metadata, {
+        { name = "COL_1", type = "decimal" },
+    })
+    t.assert_items_equals(r.rows, {
+        { 8 }
+    })
+end
+
+groupby_queries.test_select_distinct4 = function()
+    local api = cluster:server("api-1").net_box
+
+
+    local with_distinct, err = api:call("sbroad.execute", {
+        [[
+        SELECT distinct sum("e") from
+        (select "e", "f" from "arithmetic_space")
+        group by "f"
+        ]], {}
+    })
+    t.assert_equals(err, nil)
+    local without_distinct, err = api:call("sbroad.execute", {
+        [[
+        SELECT sum("e") from
+        (select "e", "f" from "arithmetic_space")
+        group by "f"
+        ]], {}
+    })
+    t.assert_equals(err, nil)
+    t.assert_items_equals(with_distinct.rows, without_distinct.rows)
+end
+
 groupby_queries.test_count_asterisk = function()
     local api = cluster:server("api-1").net_box
 
diff --git a/sbroad-core/src/backend/sql/tree/tests.rs b/sbroad-core/src/backend/sql/tree/tests.rs
index 94dfd082f4..4402c6697f 100644
--- a/sbroad-core/src/backend/sql/tree/tests.rs
+++ b/sbroad-core/src/backend/sql/tree/tests.rs
@@ -33,7 +33,7 @@ fn sql_order_selection() {
     let eq_id = plan.nodes.add_bool(a_id, Bool::Eq, const_row).unwrap();
     let select_id = plan.add_select(&[scan_id], eq_id).unwrap();
 
-    let proj_id = plan.add_proj(select_id, &["a"]).unwrap();
+    let proj_id = plan.add_proj(select_id, &["a"], false).unwrap();
     plan.set_top(proj_id).unwrap();
 
     // check the plan
@@ -153,7 +153,7 @@ fn sql_arithmetic_selection_plan() {
     // where a + (b/c + d*e) * f - b = 1
     let select_id = plan.add_select(&[scan_id], equal_id).unwrap();
 
-    let proj_id = plan.add_proj(select_id, &["a"]).unwrap();
+    let proj_id = plan.add_proj(select_id, &["a"], false).unwrap();
     plan.set_top(proj_id).unwrap();
 
     // check the plan
@@ -341,7 +341,7 @@ fn sql_arithmetic_projection_plan() {
         .unwrap();
 
     let proj_id = plan
-        .add_proj_internal(scan_id, &[arith_subract_id])
+        .add_proj_internal(scan_id, &[arith_subract_id], false)
         .unwrap();
     plan.set_top(proj_id).unwrap();
 
@@ -501,7 +501,7 @@ fn sql_arbitrary_projection_plan() {
     // a + b > c and d is not null
     let and_id = plan.nodes.add_bool(gt_id, Bool::And, unary_id).unwrap();
 
-    let proj_id = plan.add_proj_internal(scan_id, &[and_id]).unwrap();
+    let proj_id = plan.add_proj_internal(scan_id, &[and_id], false).unwrap();
     plan.set_top(proj_id).unwrap();
 
     // check the plan
diff --git a/sbroad-core/src/frontend/sql.rs b/sbroad-core/src/frontend/sql.rs
index 39ed728400..00f976007f 100644
--- a/sbroad-core/src/frontend/sql.rs
+++ b/sbroad-core/src/frontend/sql.rs
@@ -840,12 +840,33 @@ impl Ast for AbstractSyntaxTree {
                     map.add(id, plan_node_id);
                 }
                 Type::Projection => {
-                    let ast_child_id = node.children.first().ok_or_else(|| {
-                        SbroadError::UnexpectedNumberOfValues("Projection has no children.".into())
-                    })?;
-                    let plan_child_id = map.get(*ast_child_id)?;
-                    let mut proj_columns: Vec<usize> = Vec::with_capacity(node.children.len());
-                    for ast_column_id in node.children.iter().skip(1) {
+                    let (child_id, ast_columns_ids, is_distinct) = if let Some((first, other)) =
+                        node.children.split_first()
+                    {
+                        let mut is_distinct: bool = false;
+                        let mut other = other;
+                        let first_col_ast_id = other.first().ok_or_else(|| {
+                            SbroadError::Invalid(
+                                Entity::AST,
+                                Some("projection ast has no columns!".into()),
+                            )
+                        })?;
+                        if let Type::Distinct = self.nodes.get_node(*first_col_ast_id)?.rule {
+                            is_distinct = true;
+                            (_, other) = other.split_first().ok_or_else(|| {
+                                SbroadError::Invalid(
+                                    Entity::AST,
+                                    Some("projection ast has no children except distinct".into()),
+                                )
+                            })?;
+                        }
+                        (*first, other, is_distinct)
+                    } else {
+                        return Err(SbroadError::Invalid(Entity::AST, None));
+                    };
+                    let plan_child_id = map.get(child_id)?;
+                    let mut proj_columns: Vec<usize> = Vec::with_capacity(ast_columns_ids.len());
+                    for ast_column_id in ast_columns_ids {
                         let ast_column = self.nodes.get_node(*ast_column_id)?;
                         match ast_column.rule {
                             Type::Column => {
@@ -887,7 +908,8 @@ impl Ast for AbstractSyntaxTree {
                             }
                         }
                     }
-                    let projection_id = plan.add_proj_internal(plan_child_id, &proj_columns)?;
+                    let projection_id =
+                        plan.add_proj_internal(plan_child_id, &proj_columns, is_distinct)?;
                     map.add(id, projection_id);
                 }
                 Type::Multiplication | Type::Addition => {
diff --git a/sbroad-core/src/frontend/sql/ir/tests.rs b/sbroad-core/src/frontend/sql/ir/tests.rs
index a62272b464..d94b61f5ad 100644
--- a/sbroad-core/src/frontend/sql/ir/tests.rs
+++ b/sbroad-core/src/frontend/sql/ir/tests.rs
@@ -1818,6 +1818,107 @@ ON "t3"."a" = "ij"."id"
     assert_eq!(expected_explain, plan.as_explain().unwrap());
 }
 
+#[test]
+fn front_sql_select_distinct() {
+    // make sure we don't compute extra group by columns at local stage
+    let input = r#"SELECT distinct "a", "a" + "b" FROM "t""#;
+
+    let plan = sql_to_optimized_ir(input, vec![]);
+    // here we must compute only two groupby columns at local stage: a, b
+    let expected_explain = String::from(
+        r#"projection ("column_22"::unsigned -> "a", "column_27"::unsigned -> "COL_1")
+    group by ("column_22"::unsigned, "column_27"::unsigned) output: ("column_22"::unsigned -> "column_22", "column_27"::unsigned -> "column_27")
+        motion [policy: segment([ref("column_22"), ref("column_27")])]
+            scan
+                projection ("t"."a"::unsigned -> "column_22", ("t"."a"::unsigned) + ("t"."b"::unsigned) -> "column_27")
+                    group by ("t"."a"::unsigned, ("t"."a"::unsigned) + ("t"."b"::unsigned)) output: ("t"."a"::unsigned -> "a", "t"."b"::unsigned -> "b", "t"."c"::unsigned -> "c", "t"."d"::unsigned -> "d", "t"."bucket_id"::unsigned -> "bucket_id")
+                        scan "t"
+"#,
+    );
+
+    assert_eq!(expected_explain, plan.as_explain().unwrap());
+}
+
+#[test]
+fn front_sql_select_distinct_asterisk() {
+    let input = r#"SELECT distinct * FROM "t""#;
+
+    let plan = sql_to_optimized_ir(input, vec![]);
+    let expected_explain = String::from(
+        r#"projection ("column_23"::unsigned -> "a", "column_24"::unsigned -> "b", "column_25"::unsigned -> "c", "column_26"::unsigned -> "d")
+    group by ("column_24"::unsigned, "column_26"::unsigned, "column_23"::unsigned, "column_25"::unsigned) output: ("column_24"::unsigned -> "column_24", "column_23"::unsigned -> "column_23", "column_26"::unsigned -> "column_26", "column_25"::unsigned -> "column_25")
+        motion [policy: segment([ref("column_24"), ref("column_26"), ref("column_23"), ref("column_25")])]
+            scan
+                projection ("t"."b"::unsigned -> "column_24", "t"."a"::unsigned -> "column_23", "t"."d"::unsigned -> "column_26", "t"."c"::unsigned -> "column_25")
+                    group by ("t"."b"::unsigned, "t"."d"::unsigned, "t"."a"::unsigned, "t"."c"::unsigned) output: ("t"."a"::unsigned -> "a", "t"."b"::unsigned -> "b", "t"."c"::unsigned -> "c", "t"."d"::unsigned -> "d", "t"."bucket_id"::unsigned -> "bucket_id")
+                        scan "t"
+"#,
+    );
+
+    assert_eq!(expected_explain, plan.as_explain().unwrap());
+}
+
+#[test]
+fn front_sql_invalid_select_distinct() {
+    let input = r#"SELECT distinct "a" FROM "t"
+        group by "b"
+    "#;
+
+    let metadata = &RouterConfigurationMock::new();
+    let ast = AbstractSyntaxTree::new(input).unwrap();
+    let mut plan = ast.resolve_metadata(metadata).unwrap();
+    plan.replace_in_operator().unwrap();
+    plan.split_columns().unwrap();
+    plan.set_dnf().unwrap();
+    plan.derive_equalities().unwrap();
+    plan.merge_tuples().unwrap();
+    let err = plan.add_motions().unwrap_err();
+
+    assert_eq!(
+        true,
+        err.to_string()
+            .contains("column \"a\" is not found in grouping expressions!")
+    );
+}
+
+#[test]
+fn front_sql_select_distinct_with_aggr() {
+    let input = r#"SELECT distinct sum("a"), "b" FROM "t"
+    group by "b"
+    "#;
+
+    let plan = sql_to_optimized_ir(input, vec![]);
+    let expected_explain = String::from(
+        r#"projection (sum(("sum_26"::decimal))::decimal -> "COL_1", "column_12"::unsigned -> "b")
+    group by ("column_12"::unsigned) output: ("column_12"::unsigned -> "column_12", "sum_26"::decimal -> "sum_26")
+        motion [policy: segment([ref("column_12")])]
+            scan
+                projection ("t"."b"::unsigned -> "column_12", sum(("t"."a"::unsigned))::decimal -> "sum_26")
+                    group by ("t"."b"::unsigned) output: ("t"."a"::unsigned -> "a", "t"."b"::unsigned -> "b", "t"."c"::unsigned -> "c", "t"."d"::unsigned -> "d", "t"."bucket_id"::unsigned -> "bucket_id")
+                        scan "t"
+"#,
+    );
+
+    assert_eq!(expected_explain, plan.as_explain().unwrap());
+}
+
+#[test]
+fn front_sql_select_distinct_with_aggr2() {
+    let input = r#"SELECT distinct sum("a") FROM "t""#;
+
+    let plan = sql_to_optimized_ir(input, vec![]);
+    let expected_explain = String::from(
+        r#"projection (sum(("sum_13"::decimal))::decimal -> "COL_1")
+    motion [policy: full]
+        scan
+            projection (sum(("t"."a"::unsigned))::decimal -> "sum_13")
+                scan "t"
+"#,
+    );
+
+    assert_eq!(expected_explain, plan.as_explain().unwrap());
+}
+
 #[cfg(test)]
 mod params;
 mod single;
diff --git a/sbroad-core/src/frontend/sql/query.pest b/sbroad-core/src/frontend/sql/query.pest
index cb938d450f..e220ecb463 100644
--- a/sbroad-core/src/frontend/sql/query.pest
+++ b/sbroad-core/src/frontend/sql/query.pest
@@ -10,7 +10,7 @@ Query = _{ Except | UnionAll | Select | Values | Insert }
         (^"group" ~ ^"by" ~ GroupBy)? ~
         (^"having" ~ Having)?
     }
-        Projection = { (Asterisk | Column) ~ ("," ~ (Asterisk | Column))* }
+        Projection = { Distinct? ~  (Asterisk | Column) ~ ("," ~ (Asterisk | Column))* }
             Column = { Alias | Expr | Value }
                 Alias = {(Expr | Value) ~ ^"as" ~ AliasName }
                 AliasName = @{ Name }
diff --git a/sbroad-core/src/ir.rs b/sbroad-core/src/ir.rs
index 5270ba27d0..2be54c8ac4 100644
--- a/sbroad-core/src/ir.rs
+++ b/sbroad-core/src/ir.rs
@@ -617,6 +617,27 @@ impl Plan {
         ))
     }
 
+    /// Gets `Projection` column by idx
+    ///
+    /// # Errors
+    /// - supplied index is out of range
+    /// - node is not `Projection`
+    pub fn get_proj_col(&self, proj_id: usize, col_idx: usize) -> Result<usize, SbroadError> {
+        let node = self.get_relation_node(proj_id)?;
+        if let Relational::Projection { output, .. } = node {
+            let col_id = self.get_row_list(*output)?.get(col_idx).ok_or_else(|| {
+                SbroadError::UnexpectedNumberOfValues(format!(
+                    "projection column index out of range. Node: {node:?}"
+                ))
+            })?;
+            return Ok(*col_id);
+        }
+        Err(SbroadError::Invalid(
+            Entity::Node,
+            Some(format!("Expected Projection node. Got: {node:?}")),
+        ))
+    }
+
     /// Gets `GroupBy` columns
     ///
     /// # Errors
diff --git a/sbroad-core/src/ir/distribution/tests.rs b/sbroad-core/src/ir/distribution/tests.rs
index c3c9b4592d..fa57cc8ddd 100644
--- a/sbroad-core/src/ir/distribution/tests.rs
+++ b/sbroad-core/src/ir/distribution/tests.rs
@@ -26,7 +26,7 @@ fn proj_preserve_dist_key() {
     plan.add_rel(t);
 
     let scan_id = plan.add_scan("t", None).unwrap();
-    let proj_id = plan.add_proj(scan_id, &["a", "b"]).unwrap();
+    let proj_id = plan.add_proj(scan_id, &["a", "b"], false).unwrap();
 
     plan.top = Some(proj_id);
 
diff --git a/sbroad-core/src/ir/expression/tests.rs b/sbroad-core/src/ir/expression/tests.rs
index f46c7b3fb7..0d4468fd2d 100644
--- a/sbroad-core/src/ir/expression/tests.rs
+++ b/sbroad-core/src/ir/expression/tests.rs
@@ -56,7 +56,7 @@ fn rel_nodes_from_reference_in_proj() {
     .unwrap();
     plan.add_rel(t);
     let scan_id = plan.add_scan("t", None).unwrap();
-    let proj_id = plan.add_proj(scan_id, &["a"]).unwrap();
+    let proj_id = plan.add_proj(scan_id, &["a"], false).unwrap();
     let output = plan.get_relational_output(proj_id).unwrap();
 
     let rel_set = plan.get_relational_nodes_from_row(output).unwrap();
diff --git a/sbroad-core/src/ir/operator.rs b/sbroad-core/src/ir/operator.rs
index 3f09ff7551..a06bbb6d15 100644
--- a/sbroad-core/src/ir/operator.rs
+++ b/sbroad-core/src/ir/operator.rs
@@ -256,6 +256,8 @@ pub enum Relational {
         children: Vec<usize>,
         /// Outputs tuple node index in the plan node arena.
         output: usize,
+        /// Wheter the select was marked with `distinct` keyword
+        is_distinct: bool,
     },
     ScanRelation {
         // Scan name.
@@ -832,7 +834,7 @@ impl Plan {
                 } else {
                     relation.clone()
                 };
-                let proj_id = self.add_proj(*child, &[])?;
+                let proj_id = self.add_proj(*child, &[], false)?;
                 let sq_id = self.add_sub_query(proj_id, Some(&scan_name))?;
                 children.push(sq_id);
 
@@ -955,11 +957,17 @@ impl Plan {
     /// - child node is not relational
     /// - child output tuple is invalid
     /// - column name do not match the ones in the child output tuple
-    pub fn add_proj(&mut self, child: usize, col_names: &[&str]) -> Result<usize, SbroadError> {
+    pub fn add_proj(
+        &mut self,
+        child: usize,
+        col_names: &[&str],
+        is_distinct: bool,
+    ) -> Result<usize, SbroadError> {
         let output = self.add_row_for_output(child, col_names, false)?;
         let proj = Relational::Projection {
             children: vec![child],
             output,
+            is_distinct,
         };
 
         let proj_id = self.nodes.push(Node::Relational(proj));
@@ -977,11 +985,13 @@ impl Plan {
         &mut self,
         child: usize,
         columns: &[usize],
+        is_distinct: bool,
     ) -> Result<usize, SbroadError> {
         let output = self.nodes.add_row_of_aliases(columns.to_vec(), None)?;
         let proj = Relational::Projection {
             children: vec![child],
             output,
+            is_distinct,
         };
 
         let proj_id = self.nodes.push(Node::Relational(proj));
diff --git a/sbroad-core/src/ir/operator/tests.rs b/sbroad-core/src/ir/operator/tests.rs
index c16f52e3ce..aea9b0b80d 100644
--- a/sbroad-core/src/ir/operator/tests.rs
+++ b/sbroad-core/src/ir/operator/tests.rs
@@ -107,7 +107,7 @@ fn projection() {
     // Invalid alias names in the output
     assert_eq!(
         SbroadError::NotFound(Entity::Column, r#"with name a, e"#.into()),
-        plan.add_proj(scan_id, &["a", "e"]).unwrap_err()
+        plan.add_proj(scan_id, &["a", "e"], false).unwrap_err()
     );
 
     // Expression node instead of relational one
@@ -116,13 +116,13 @@ fn projection() {
             Entity::Node,
             Some("node is not Relational type: Expression(Alias { name: \"a\", child: 0 })".into())
         ),
-        plan.add_proj(1, &["a"]).unwrap_err()
+        plan.add_proj(1, &["a"], false).unwrap_err()
     );
 
     // Try to build projection from the non-existing node
     assert_eq!(
         SbroadError::NotFound(Entity::Node, "from arena with index 42".to_string()),
-        plan.add_proj(42, &["a"]).unwrap_err()
+        plan.add_proj(42, &["a"], false).unwrap_err()
     );
 }
 
@@ -448,7 +448,7 @@ fn selection_with_sub_query() {
     .unwrap();
     plan.add_rel(t2);
     let scan_t2_id = plan.add_scan("t2", None).unwrap();
-    let proj_id = plan.add_proj(scan_t2_id, &["b"]).unwrap();
+    let proj_id = plan.add_proj(scan_t2_id, &["b"], false).unwrap();
     let sq_id = plan.add_sub_query(proj_id, None).unwrap();
     children.push(sq_id);
 
diff --git a/sbroad-core/src/ir/transformation/redistribution/groupby.rs b/sbroad-core/src/ir/transformation/redistribution/groupby.rs
index c68014cdde..966501d164 100644
--- a/sbroad-core/src/ir/transformation/redistribution/groupby.rs
+++ b/sbroad-core/src/ir/transformation/redistribution/groupby.rs
@@ -641,7 +641,7 @@ impl Plan {
             }
         }
 
-        let groupby_id = self.add_groupby(*first_child, other, false)?;
+        let groupby_id = self.add_groupby(*first_child, other, false, None)?;
         Ok(groupby_id)
     }
 
@@ -655,6 +655,7 @@ impl Plan {
         child_id: usize,
         grouping_exprs: &[usize],
         is_final: bool,
+        expr_parent: Option<usize>,
     ) -> Result<usize, SbroadError> {
         let final_output = self.add_row_for_output(child_id, &[], true)?;
         let groupby = Relational::GroupBy {
@@ -668,7 +669,7 @@ impl Plan {
 
         self.replace_parent_in_subtree(final_output, None, Some(groupby_id))?;
         for expr in grouping_exprs.iter() {
-            self.replace_parent_in_subtree(*expr, None, Some(groupby_id))?;
+            self.replace_parent_in_subtree(*expr, expr_parent, Some(groupby_id))?;
         }
 
         Ok(groupby_id)
@@ -788,11 +789,14 @@ impl Plan {
     }
 
     /// Collects information about grouping expressions for future use.
+    /// In case there is a `Projection` with `distinct` modifier and
+    /// no `GroupBy` node, a `GroupBy` node with projection expressions
+    /// will be created.
     /// This function also does all the validation of incorrect usage of
     /// expressions used outside of aggregate functions.
     ///
-    ///
     /// # Returns
+    /// - id of `GroupBy` node if is was created or `upper` otherwise
     /// - list of ids of expressions used in `GroupBy`. Duplicate expressions are removed.
     /// - mapping between `GroupBy` expressions and corresponding expressions in final nodes
     /// (`Projection`, `Having`, `GroupBy`, `OrderBy`).
@@ -811,16 +815,51 @@ impl Plan {
     /// - invalid query with `Having`: in case there's no `GroupBy`, `Having` may contain
     /// only expressions with constants and aggregates. References outside of aggregate functions
     /// are illegal.
+    #[allow(clippy::too_many_lines)]
     fn collect_grouping_expressions(
         &mut self,
         upper: usize,
         finals: &Vec<usize>,
         has_aggregates: bool,
-    ) -> Result<(Vec<usize>, GroupbyExpressionsMap), SbroadError> {
+    ) -> Result<(usize, Vec<usize>, GroupbyExpressionsMap), SbroadError> {
         let mut grouping_expr = vec![];
         let mut gr_expr_map: GroupbyExpressionsMap = HashMap::new();
+        let mut upper = upper;
+
+        let mut has_groupby = matches!(self.get_relation_node(upper)?, Relational::GroupBy { .. });
+
+        if !has_groupby && !has_aggregates {
+            if let Some(proj_id) = finals.first() {
+                if let Relational::Projection {
+                    is_distinct,
+                    output,
+                    ..
+                } = self.get_relation_node(*proj_id)?
+                {
+                    if *is_distinct {
+                        let proj_cols_len = self.get_row_list(*output)?.len();
+                        let mut grouping_exprs: Vec<usize> = Vec::with_capacity(proj_cols_len);
+                        for i in 0..proj_cols_len {
+                            let aliased_col = self.get_proj_col(*proj_id, i)?;
+                            let proj_col_id = if let Expression::Alias { child, .. } =
+                                self.get_expression_node(aliased_col)?
+                            {
+                                *child
+                            } else {
+                                aliased_col
+                            };
+                            // Copy expression from Projection to GroupBy.
+                            let col = self.clone_expr_subtree(proj_col_id)?;
+                            grouping_exprs.push(col);
+                        }
+                        upper = self.add_groupby(upper, &grouping_exprs, false, Some(*proj_id))?;
+
+                        has_groupby = true;
+                    }
+                }
+            }
+        }
 
-        let has_groupby = matches!(self.get_relation_node(upper)?, Relational::GroupBy { .. });
         if has_groupby {
             let old_gr_cols = self.get_grouping_cols(upper)?;
             // remove duplicate expressions
@@ -901,7 +940,7 @@ impl Plan {
             }
         }
 
-        Ok((grouping_expr, gr_expr_map))
+        Ok((upper, grouping_expr, gr_expr_map))
     }
 
     /// Add expressions used as arguments to distinct aggregates to `GroupBy` in reduce stage
@@ -922,7 +961,8 @@ impl Plan {
             {
                 gr_cols.extend(additional_grouping_exprs.into_iter());
             } else {
-                local_proj_child_id = self.add_groupby(upper, &additional_grouping_exprs, true)?;
+                local_proj_child_id =
+                    self.add_groupby(upper, &additional_grouping_exprs, true, None)?;
                 self.set_distribution(self.get_relational_output(local_proj_child_id)?)?;
             }
         }
@@ -999,6 +1039,7 @@ impl Plan {
         let proj = Relational::Projection {
             output: proj_output,
             children: vec![child_id],
+            is_distinct: false,
         };
         let proj_id = self.nodes.push(Node::Relational(proj));
         for info in aggr_infos {
@@ -1657,7 +1698,7 @@ impl Plan {
         let (finals, upper) = self.split_reduce_stage(final_proj_id)?;
         let mut aggr_infos = self.collect_aggregates(&finals)?;
         let has_aggregates = !aggr_infos.is_empty();
-        let (grouping_exprs, gr_expr_map) =
+        let (upper, grouping_exprs, gr_expr_map) =
             self.collect_grouping_expressions(upper, &finals, has_aggregates)?;
         if grouping_exprs.is_empty() && aggr_infos.is_empty() {
             return Ok(false);
diff --git a/sbroad-core/src/ir/transformation/redistribution/tests.rs b/sbroad-core/src/ir/transformation/redistribution/tests.rs
index 45a93620e2..cc52630f15 100644
--- a/sbroad-core/src/ir/transformation/redistribution/tests.rs
+++ b/sbroad-core/src/ir/transformation/redistribution/tests.rs
@@ -39,7 +39,7 @@ fn full_motion_less_for_sub_query() {
     .unwrap();
     plan.add_rel(t2);
     let scan_t2_id = plan.add_scan("t2", None).unwrap();
-    let proj_id = plan.add_proj(scan_t2_id, &["b"]).unwrap();
+    let proj_id = plan.add_proj(scan_t2_id, &["b"], false).unwrap();
     let sq_id = plan.add_sub_query(proj_id, None).unwrap();
     children.push(sq_id);
 
@@ -100,7 +100,7 @@ fn full_motion_non_segment_outer_for_sub_query() {
     .unwrap();
     plan.add_rel(t2);
     let scan_t2_id = plan.add_scan("t2", None).unwrap();
-    let proj_id = plan.add_proj(scan_t2_id, &["a"]).unwrap();
+    let proj_id = plan.add_proj(scan_t2_id, &["a"], false).unwrap();
     let sq_id = plan.add_sub_query(proj_id, None).unwrap();
     children.push(sq_id);
 
@@ -158,7 +158,7 @@ fn local_sub_query() {
     .unwrap();
     plan.add_rel(t2);
     let scan_t2_id = plan.add_scan("t2", None).unwrap();
-    let proj_id = plan.add_proj(scan_t2_id, &["a"]).unwrap();
+    let proj_id = plan.add_proj(scan_t2_id, &["a"], false).unwrap();
     let sq_id = plan.add_sub_query(proj_id, None).unwrap();
     children.push(sq_id);
 
@@ -218,13 +218,13 @@ fn multiple_sub_queries() {
     .unwrap();
     plan.add_rel(t2);
     let sq1_scan_t2_id = plan.add_scan("t2", None).unwrap();
-    let sq1_proj_id = plan.add_proj(sq1_scan_t2_id, &["a"]).unwrap();
+    let sq1_proj_id = plan.add_proj(sq1_scan_t2_id, &["a"], false).unwrap();
     let sq1_id = plan.add_sub_query(sq1_proj_id, None).unwrap();
     children.push(sq1_id);
     let sq1_pos = children.len() - 1;
 
     let sq2_scan_t2_id = plan.add_scan("t2", None).unwrap();
-    let sq2_proj_id = plan.add_proj(sq2_scan_t2_id, &["b"]).unwrap();
+    let sq2_proj_id = plan.add_proj(sq2_scan_t2_id, &["b"], false).unwrap();
     let sq2_id = plan.add_sub_query(sq2_proj_id, None).unwrap();
     children.push(sq2_id);
     let sq2_pos = children.len() - 1;
diff --git a/sbroad-core/src/ir/transformation/redistribution/tests/segment.rs b/sbroad-core/src/ir/transformation/redistribution/tests/segment.rs
index 78a9fab026..829319b767 100644
--- a/sbroad-core/src/ir/transformation/redistribution/tests/segment.rs
+++ b/sbroad-core/src/ir/transformation/redistribution/tests/segment.rs
@@ -45,7 +45,7 @@ fn sub_query1() {
     .unwrap();
     plan.add_rel(t2);
     let scan_t2_id = plan.add_scan("t2", None).unwrap();
-    let proj_id = plan.add_proj(scan_t2_id, &["b"]).unwrap();
+    let proj_id = plan.add_proj(scan_t2_id, &["b"], false).unwrap();
     let sq_id = plan.add_sub_query(proj_id, None).unwrap();
     children.push(sq_id);
 
diff --git a/sbroad-core/src/ir/tree/tests.rs b/sbroad-core/src/ir/tree/tests.rs
index cfe0ee8229..2d7eaf06bf 100644
--- a/sbroad-core/src/ir/tree/tests.rs
+++ b/sbroad-core/src/ir/tree/tests.rs
@@ -143,7 +143,7 @@ fn selection_subquery_dfs_post() {
     let const1 = plan.add_const(Value::from(1_u64));
     let eq_op = plan.nodes.add_bool(b, Bool::Eq, const1).unwrap();
     let selection_t2_id = plan.add_select(&[scan_t2_id], eq_op).unwrap();
-    let proj_id = plan.add_proj(selection_t2_id, &["c"]).unwrap();
+    let proj_id = plan.add_proj(selection_t2_id, &["c"], false).unwrap();
     let sq_id = plan.add_sub_query(proj_id, None).unwrap();
     let c = plan.get_row_from_rel_node(sq_id).unwrap();
 
@@ -217,7 +217,7 @@ fn subtree_dfs_post() {
     let const1 = plan.add_const(Value::from(1_i64));
     let eq_op = plan.nodes.add_bool(a, Bool::Eq, const1).unwrap();
     let selection_t1_id = plan.add_select(&[scan_t1_id], eq_op).unwrap();
-    let proj_id = plan.add_proj(selection_t1_id, &["c"]).unwrap();
+    let proj_id = plan.add_proj(selection_t1_id, &["c"], false).unwrap();
 
     plan.set_top(proj_id).unwrap();
     let top = plan.get_top().unwrap();
diff --git a/sbroad-core/tests/artifactory/backend/sql/tree/arbitrary_projection_plan.yaml b/sbroad-core/tests/artifactory/backend/sql/tree/arbitrary_projection_plan.yaml
index 5aca515d4c..1bccfb4733 100644
--- a/sbroad-core/tests/artifactory/backend/sql/tree/arbitrary_projection_plan.yaml
+++ b/sbroad-core/tests/artifactory/backend/sql/tree/arbitrary_projection_plan.yaml
@@ -163,6 +163,7 @@ nodes:
           children:
             - 11
           output: 24
+          is_distinct: false
 relations:
   tables:
     t:
diff --git a/sbroad-core/tests/artifactory/backend/sql/tree/arithmetic_projection_plan.yaml b/sbroad-core/tests/artifactory/backend/sql/tree/arithmetic_projection_plan.yaml
index cc74da7c25..1f7eee391f 100644
--- a/sbroad-core/tests/artifactory/backend/sql/tree/arithmetic_projection_plan.yaml
+++ b/sbroad-core/tests/artifactory/backend/sql/tree/arithmetic_projection_plan.yaml
@@ -233,6 +233,7 @@ nodes:
           children:
             - 15
           output: 34
+          is_distinct: false
 relations:
   tables:
     t:
diff --git a/sbroad-core/tests/artifactory/backend/sql/tree/arithmetic_selection_plan.yaml b/sbroad-core/tests/artifactory/backend/sql/tree/arithmetic_selection_plan.yaml
index 4771ec3b9a..052b330ba3 100644
--- a/sbroad-core/tests/artifactory/backend/sql/tree/arithmetic_selection_plan.yaml
+++ b/sbroad-core/tests/artifactory/backend/sql/tree/arithmetic_selection_plan.yaml
@@ -365,6 +365,7 @@ nodes:
           children:
             - 52
           output: 55
+          is_distinct: false
 relations:
   tables:
     t:
diff --git a/sbroad-core/tests/artifactory/backend/sql/tree/sql_order_selection.yaml b/sbroad-core/tests/artifactory/backend/sql/tree/sql_order_selection.yaml
index 95ba03c2d2..a185c11dcb 100644
--- a/sbroad-core/tests/artifactory/backend/sql/tree/sql_order_selection.yaml
+++ b/sbroad-core/tests/artifactory/backend/sql/tree/sql_order_selection.yaml
@@ -106,6 +106,7 @@ nodes:
           children:
             - 12
           output: 15
+          is_distinct: false
 relations:
   tables:
     t:
diff --git a/sbroad-core/tests/artifactory/ir/distribution/shrink_dist_key_1.yaml b/sbroad-core/tests/artifactory/ir/distribution/shrink_dist_key_1.yaml
index 5dd1672993..a692dafc19 100644
--- a/sbroad-core/tests/artifactory/ir/distribution/shrink_dist_key_1.yaml
+++ b/sbroad-core/tests/artifactory/ir/distribution/shrink_dist_key_1.yaml
@@ -86,6 +86,7 @@ nodes:
           children:
             - 9
           output: 14
+          is_distinct: false
 relations:
   tables:
     t:
diff --git a/sbroad-core/tests/artifactory/ir/distribution/shrink_dist_key_2.yaml b/sbroad-core/tests/artifactory/ir/distribution/shrink_dist_key_2.yaml
index adc549d8e9..8bc6b34e1a 100644
--- a/sbroad-core/tests/artifactory/ir/distribution/shrink_dist_key_2.yaml
+++ b/sbroad-core/tests/artifactory/ir/distribution/shrink_dist_key_2.yaml
@@ -74,6 +74,7 @@ nodes:
           children:
             - 9
           output: 12
+          is_distinct: false
 relations:
   tables:
     t:
diff --git a/sbroad-core/tests/artifactory/ir/distribution/shuffle_dist_key.yaml b/sbroad-core/tests/artifactory/ir/distribution/shuffle_dist_key.yaml
index 792d51928f..67625cb4d8 100644
--- a/sbroad-core/tests/artifactory/ir/distribution/shuffle_dist_key.yaml
+++ b/sbroad-core/tests/artifactory/ir/distribution/shuffle_dist_key.yaml
@@ -86,6 +86,7 @@ nodes:
           children:
             - 9
           output: 14
+          is_distinct: false
 relations:
   tables:
     t:
diff --git a/sbroad-core/tests/artifactory/ir/operator/projection.yaml b/sbroad-core/tests/artifactory/ir/operator/projection.yaml
index bf487d47e0..b4c8456fd7 100644
--- a/sbroad-core/tests/artifactory/ir/operator/projection.yaml
+++ b/sbroad-core/tests/artifactory/ir/operator/projection.yaml
@@ -63,6 +63,7 @@ nodes:
           children:
             - 5
           output: 8
+          is_distinct: false
 relations:
   tables:
     t:
diff --git a/sbroad-core/tests/artifactory/ir/operator/selection_with_sub_query.yaml b/sbroad-core/tests/artifactory/ir/operator/selection_with_sub_query.yaml
index 235d9a94f6..da03e6f396 100644
--- a/sbroad-core/tests/artifactory/ir/operator/selection_with_sub_query.yaml
+++ b/sbroad-core/tests/artifactory/ir/operator/selection_with_sub_query.yaml
@@ -63,6 +63,7 @@ nodes:
           children:
             - 7
           output: 10
+          is_distinct: false
     - Expression:
         Reference:
           targets:
diff --git a/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_less_for_sub_query.yaml b/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_less_for_sub_query.yaml
index 112849f18f..8919a2a5f1 100644
--- a/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_less_for_sub_query.yaml
+++ b/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_less_for_sub_query.yaml
@@ -79,6 +79,7 @@ nodes:
           children:
             - 9
           output: 12
+          is_distinct: false
     - Expression:
         Reference:
           targets:
diff --git a/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_non_segment_outer_for_sub_query.yaml b/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_non_segment_outer_for_sub_query.yaml
index d5850c89b5..0e2c032ef2 100644
--- a/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_non_segment_outer_for_sub_query.yaml
+++ b/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_non_segment_outer_for_sub_query.yaml
@@ -83,6 +83,7 @@ nodes:
           children:
             - 9
           output: 12
+          is_distinct: false
     - Expression:
         Reference:
           targets:
diff --git a/sbroad-core/tests/artifactory/ir/transformation/redistribution/local_sub_query.yaml b/sbroad-core/tests/artifactory/ir/transformation/redistribution/local_sub_query.yaml
index 0ebac3494f..c963ab83f2 100644
--- a/sbroad-core/tests/artifactory/ir/transformation/redistribution/local_sub_query.yaml
+++ b/sbroad-core/tests/artifactory/ir/transformation/redistribution/local_sub_query.yaml
@@ -72,6 +72,7 @@ nodes:
           children:
             - 7
           output: 10
+          is_distinct: false
     - Expression:
         Reference:
           targets:
diff --git a/sbroad-core/tests/artifactory/ir/transformation/redistribution/multiple_sub_queries.yaml b/sbroad-core/tests/artifactory/ir/transformation/redistribution/multiple_sub_queries.yaml
index 19ef19754d..9c42798c7a 100644
--- a/sbroad-core/tests/artifactory/ir/transformation/redistribution/multiple_sub_queries.yaml
+++ b/sbroad-core/tests/artifactory/ir/transformation/redistribution/multiple_sub_queries.yaml
@@ -83,6 +83,7 @@ nodes:
           children:
             - 9
           output: 12
+          is_distinct: false
     - Expression:
         Reference:
           targets:
@@ -164,6 +165,7 @@ nodes:
           children:
             - 23
           output: 26
+          is_distinct: false
     - Expression:
         Reference:
           targets:
diff --git a/sbroad-core/tests/artifactory/ir/transformation/redistribution/segment_motion_for_sub_query.yaml b/sbroad-core/tests/artifactory/ir/transformation/redistribution/segment_motion_for_sub_query.yaml
index c6feb87d02..7d400bbb86 100644
--- a/sbroad-core/tests/artifactory/ir/transformation/redistribution/segment_motion_for_sub_query.yaml
+++ b/sbroad-core/tests/artifactory/ir/transformation/redistribution/segment_motion_for_sub_query.yaml
@@ -82,6 +82,7 @@ nodes:
           children:
             - 9
           output: 12
+          is_distinct: false
     - Expression:
         Reference:
           targets:
-- 
GitLab