diff --git a/sbroad-cartridge/test_app/test/integration/join_test.lua b/sbroad-cartridge/test_app/test/integration/join_test.lua
index 64a501b8fe0fde57f1be1fb4f66ed62f61e0606e..5781ddb0cb2c1e9ac008163d8acfb7516148253f 100644
--- a/sbroad-cartridge/test_app/test/integration/join_test.lua
+++ b/sbroad-cartridge/test_app/test/integration/join_test.lua
@@ -50,7 +50,7 @@ g.test_join_vtable_with_same_column_names = function()
     local api = cluster:server("api-1").net_box
 
     local res, err = api:call("sbroad.execute", {
-        [[select distinct *
+        [[select *
           from
             "testing_space"
           join
@@ -73,12 +73,12 @@ g.test_join_vtable_with_same_column_names = function()
         {name = "s", type = "integer"},
     })
     t.assert_equals(res.rows, {
+        {1, "a", 1, 1, 1},
         {2, "a", 1, 2, 2},
         {3, "a", 2, 3, 3},
         {4, "b", 1, 4, 4},
-        {5, "b", 2, 5, 5},
         {6, "b", 3, 6, 6},
+        {5, "b", 2, 5, 5},
         {7, "c", 4, 7, 7},
-        {1, "a", 1, 1, 1},
     })
 end
diff --git a/sbroad-core/src/frontend/sql/ir/tests.rs b/sbroad-core/src/frontend/sql/ir/tests.rs
index 1899d480464f2959900f342310f9201ccdfdc75a..7c23ef7c767148e2f415574ea48d2d2400b90a8d 100644
--- a/sbroad-core/src/frontend/sql/ir/tests.rs
+++ b/sbroad-core/src/frontend/sql/ir/tests.rs
@@ -1435,19 +1435,18 @@ fn front_sql_groupby_join_1() {
     println!("{}", plan.as_explain().unwrap());
     let expected_explain = String::from(
         r#"projection ("column_63"::string -> "product_code", "column_64"::boolean -> "product_units")
-    group by ("column_63"::string, "column_64"::boolean) output: ("column_64"::boolean -> "column_64", "column_63"::string -> "column_63")
+    group by ("column_63"::string, "column_64"::boolean) output: ("column_63"::string -> "column_63", "column_64"::boolean -> "column_64")
         motion [policy: segment([ref("column_63"), ref("column_64")])]
-            scan
-                projection ("t2"."product_units"::boolean -> "column_64", "t2"."product_code"::string -> "column_63")
-                    group by ("t2"."product_code"::string, "t2"."product_units"::boolean) output: ("t2"."product_units"::boolean -> "product_units", "t2"."product_code"::string -> "product_code", "t2"."identification_number"::integer -> "identification_number", "t"."id"::unsigned -> "id")
-                        join on ROW("t2"."identification_number"::integer) = ROW("t"."id"::unsigned)
-                            scan "t2"
-                                projection ("hash_testing"."product_units"::boolean -> "product_units", "hash_testing"."product_code"::string -> "product_code", "hash_testing"."identification_number"::integer -> "identification_number")
-                                    scan "hash_testing"
-                            motion [policy: full]
-                                scan "t"
-                                    projection ("test_space"."id"::unsigned -> "id")
-                                        scan "test_space"
+            projection ("t2"."product_code"::string -> "column_63", "t2"."product_units"::boolean -> "column_64")
+                group by ("t2"."product_code"::string, "t2"."product_units"::boolean) output: ("t2"."product_units"::boolean -> "product_units", "t2"."product_code"::string -> "product_code", "t2"."identification_number"::integer -> "identification_number", "t"."id"::unsigned -> "id")
+                    join on ROW("t2"."identification_number"::integer) = ROW("t"."id"::unsigned)
+                        scan "t2"
+                            projection ("hash_testing"."product_units"::boolean -> "product_units", "hash_testing"."product_code"::string -> "product_code", "hash_testing"."identification_number"::integer -> "identification_number")
+                                scan "hash_testing"
+                        motion [policy: full]
+                            scan "t"
+                                projection ("test_space"."id"::unsigned -> "id")
+                                    scan "test_space"
 execution options:
 sql_vdbe_max_steps = 45000
 vtable_max_rows = 5000
@@ -1549,13 +1548,12 @@ fn front_sql_groupby_insert() {
     let expected_explain = String::from(
         r#"insert "t" on conflict: fail
     motion [policy: segment([value(NULL), ref("d")])]
-        projection ("column_764"::unsigned -> "b", "column_864"::unsigned -> "d")
-            group by ("column_864"::unsigned, "column_764"::unsigned) output: ("column_764"::unsigned -> "column_764", "column_864"::unsigned -> "column_864")
-                motion [policy: segment([ref("column_864"), ref("column_764")])]
-                    scan
-                        projection ("t"."b"::unsigned -> "column_764", "t"."d"::unsigned -> "column_864")
-                            group by ("t"."d"::unsigned, "t"."b"::unsigned) output: ("t"."a"::unsigned -> "a", "t"."b"::unsigned -> "b", "t"."c"::unsigned -> "c", "t"."d"::unsigned -> "d", "t"."bucket_id"::unsigned -> "bucket_id")
-                                scan "t"
+        projection ("column_12"::unsigned -> "b", "column_13"::unsigned -> "d")
+            group by ("column_12"::unsigned, "column_13"::unsigned) output: ("column_12"::unsigned -> "column_12", "column_13"::unsigned -> "column_13")
+                motion [policy: segment([ref("column_12"), ref("column_13")])]
+                    projection ("t"."b"::unsigned -> "column_12", "t"."d"::unsigned -> "column_13")
+                        group by ("t"."b"::unsigned, "t"."d"::unsigned) output: ("t"."a"::unsigned -> "a", "t"."b"::unsigned -> "b", "t"."c"::unsigned -> "c", "t"."d"::unsigned -> "d", "t"."bucket_id"::unsigned -> "bucket_id")
+                            scan "t"
 execution options:
 sql_vdbe_max_steps = 45000
 vtable_max_rows = 5000
@@ -2441,12 +2439,11 @@ fn front_sql_groupby_expression3() {
     println!("{}", plan.as_explain().unwrap());
     let expected_explain = String::from(
         r#"projection ("column_16"::unsigned -> "col_1", "column_27"::unsigned * ROW(sum(("sum_59"::decimal))::decimal) / ROW(sum(("count_65"::integer))::decimal) -> "col_2")
-    group by ("column_16"::unsigned, "column_27"::unsigned) output: ("column_27"::unsigned -> "column_27", "column_16"::unsigned -> "column_16", "count_65"::integer -> "count_65", "sum_59"::decimal -> "sum_59")
+    group by ("column_16"::unsigned, "column_27"::unsigned) output: ("column_16"::unsigned -> "column_16", "column_27"::unsigned -> "column_27", "count_65"::integer -> "count_65", "sum_59"::decimal -> "sum_59")
         motion [policy: segment([ref("column_16"), ref("column_27")])]
-            scan
-                projection ((ROW("t"."c"::unsigned) * ROW("t"."d"::unsigned)) -> "column_27", ROW("t"."a"::unsigned) + ROW("t"."b"::unsigned) -> "column_16", count((ROW("t"."a"::unsigned) * ROW("t"."b"::unsigned)))::integer -> "count_65", sum((ROW("t"."c"::unsigned) * ROW("t"."d"::unsigned)))::decimal -> "sum_59")
-                    group by (ROW("t"."a"::unsigned) + ROW("t"."b"::unsigned), (ROW("t"."c"::unsigned) * ROW("t"."d"::unsigned))) output: ("t"."a"::unsigned -> "a", "t"."b"::unsigned -> "b", "t"."c"::unsigned -> "c", "t"."d"::unsigned -> "d", "t"."bucket_id"::unsigned -> "bucket_id")
-                        scan "t"
+            projection (ROW("t"."a"::unsigned) + ROW("t"."b"::unsigned) -> "column_16", (ROW("t"."c"::unsigned) * ROW("t"."d"::unsigned)) -> "column_27", count((ROW("t"."a"::unsigned) * ROW("t"."b"::unsigned)))::integer -> "count_65", sum((ROW("t"."c"::unsigned) * ROW("t"."d"::unsigned)))::decimal -> "sum_59")
+                group by (ROW("t"."a"::unsigned) + ROW("t"."b"::unsigned), (ROW("t"."c"::unsigned) * ROW("t"."d"::unsigned))) output: ("t"."a"::unsigned -> "a", "t"."b"::unsigned -> "b", "t"."c"::unsigned -> "c", "t"."d"::unsigned -> "d", "t"."bucket_id"::unsigned -> "bucket_id")
+                    scan "t"
 execution options:
 sql_vdbe_max_steps = 45000
 vtable_max_rows = 5000
@@ -2465,12 +2462,11 @@ fn front_sql_groupby_expression4() {
     println!("{}", plan.as_explain().unwrap());
     let expected_explain = String::from(
         r#"projection ("column_16"::unsigned -> "col_1", "column_17"::unsigned -> "a")
-    group by ("column_16"::unsigned, "column_17"::unsigned) output: ("column_17"::unsigned -> "column_17", "column_16"::unsigned -> "column_16")
+    group by ("column_16"::unsigned, "column_17"::unsigned) output: ("column_16"::unsigned -> "column_16", "column_17"::unsigned -> "column_17")
         motion [policy: segment([ref("column_16"), ref("column_17")])]
-            scan
-                projection ("t"."a"::unsigned -> "column_17", ROW("t"."a"::unsigned) + ROW("t"."b"::unsigned) -> "column_16")
-                    group by (ROW("t"."a"::unsigned) + ROW("t"."b"::unsigned), "t"."a"::unsigned) output: ("t"."a"::unsigned -> "a", "t"."b"::unsigned -> "b", "t"."c"::unsigned -> "c", "t"."d"::unsigned -> "d", "t"."bucket_id"::unsigned -> "bucket_id")
-                        scan "t"
+            projection (ROW("t"."a"::unsigned) + ROW("t"."b"::unsigned) -> "column_16", "t"."a"::unsigned -> "column_17")
+                group by (ROW("t"."a"::unsigned) + ROW("t"."b"::unsigned), "t"."a"::unsigned) output: ("t"."a"::unsigned -> "a", "t"."b"::unsigned -> "b", "t"."c"::unsigned -> "c", "t"."d"::unsigned -> "d", "t"."bucket_id"::unsigned -> "bucket_id")
+                    scan "t"
 execution options:
 sql_vdbe_max_steps = 45000
 vtable_max_rows = 5000
@@ -2493,22 +2489,20 @@ fn front_sql_groupby_with_aggregates() {
         r#"projection ("t1"."a"::unsigned -> "a", "t1"."b"::unsigned -> "b", "t1"."c"::decimal -> "c", "t2"."g"::unsigned -> "g", "t2"."e"::unsigned -> "e", "t2"."f"::decimal -> "f")
     join on ROW("t1"."a"::unsigned, "t2"."g"::unsigned) = ROW("t2"."e"::unsigned, "t1"."b"::unsigned)
         scan "t1"
-            projection ("column_764"::unsigned -> "a", "column_864"::unsigned -> "b", sum(("sum_31"::decimal))::decimal -> "c")
-                group by ("column_864"::unsigned, "column_764"::unsigned) output: ("column_764"::unsigned -> "column_764", "column_864"::unsigned -> "column_864", "sum_31"::decimal -> "sum_31")
-                    motion [policy: segment([ref("column_864"), ref("column_764")])]
-                        scan
-                            projection ("t"."a"::unsigned -> "column_764", "t"."b"::unsigned -> "column_864", sum(("t"."c"::unsigned))::decimal -> "sum_31")
-                                group by ("t"."b"::unsigned, "t"."a"::unsigned) output: ("t"."a"::unsigned -> "a", "t"."b"::unsigned -> "b", "t"."c"::unsigned -> "c", "t"."d"::unsigned -> "d", "t"."bucket_id"::unsigned -> "bucket_id")
-                                    scan "t"
+            projection ("column_12"::unsigned -> "a", "column_13"::unsigned -> "b", sum(("sum_31"::decimal))::decimal -> "c")
+                group by ("column_12"::unsigned, "column_13"::unsigned) output: ("column_12"::unsigned -> "column_12", "column_13"::unsigned -> "column_13", "sum_31"::decimal -> "sum_31")
+                    motion [policy: segment([ref("column_12"), ref("column_13")])]
+                        projection ("t"."a"::unsigned -> "column_12", "t"."b"::unsigned -> "column_13", sum(("t"."c"::unsigned))::decimal -> "sum_31")
+                            group by ("t"."a"::unsigned, "t"."b"::unsigned) output: ("t"."a"::unsigned -> "a", "t"."b"::unsigned -> "b", "t"."c"::unsigned -> "c", "t"."d"::unsigned -> "d", "t"."bucket_id"::unsigned -> "bucket_id")
+                                scan "t"
         motion [policy: full]
             scan "t2"
                 projection ("column_55"::unsigned -> "g", "column_56"::unsigned -> "e", sum(("sum_74"::decimal))::decimal -> "f")
-                    group by ("column_55"::unsigned, "column_56"::unsigned) output: ("column_56"::unsigned -> "column_56", "column_55"::unsigned -> "column_55", "sum_74"::decimal -> "sum_74")
+                    group by ("column_55"::unsigned, "column_56"::unsigned) output: ("column_55"::unsigned -> "column_55", "column_56"::unsigned -> "column_56", "sum_74"::decimal -> "sum_74")
                         motion [policy: segment([ref("column_55"), ref("column_56")])]
-                            scan
-                                projection ("t2"."e"::unsigned -> "column_56", "t2"."g"::unsigned -> "column_55", sum(("t2"."f"::unsigned))::decimal -> "sum_74")
-                                    group by ("t2"."g"::unsigned, "t2"."e"::unsigned) output: ("t2"."e"::unsigned -> "e", "t2"."f"::unsigned -> "f", "t2"."g"::unsigned -> "g", "t2"."h"::unsigned -> "h", "t2"."bucket_id"::unsigned -> "bucket_id")
-                                        scan "t2"
+                            projection ("t2"."g"::unsigned -> "column_55", "t2"."e"::unsigned -> "column_56", sum(("t2"."f"::unsigned))::decimal -> "sum_74")
+                                group by ("t2"."g"::unsigned, "t2"."e"::unsigned) output: ("t2"."e"::unsigned -> "e", "t2"."f"::unsigned -> "f", "t2"."g"::unsigned -> "g", "t2"."h"::unsigned -> "h", "t2"."bucket_id"::unsigned -> "bucket_id")
+                                    scan "t2"
 execution options:
 sql_vdbe_max_steps = 45000
 vtable_max_rows = 5000
@@ -2833,16 +2827,15 @@ fn front_sql_having_with_sq_segment_motion() {
     println!("{}", plan.as_explain().unwrap());
 
     let expected_explain = String::from(
-        r#"projection ("column_764"::unsigned -> "sysFrom", "column_864"::unsigned -> "sys_op", sum(distinct ("column_70"::decimal))::decimal -> "sum", count(distinct ("column_70"::integer))::integer -> "count")
-    having ROW("column_764"::unsigned, "column_864"::unsigned) in ROW($0, $0)
-        group by ("column_864"::unsigned, "column_764"::unsigned) output: ("column_764"::unsigned -> "column_764", "column_70"::unsigned -> "column_70", "column_864"::unsigned -> "column_864")
-            motion [policy: segment([ref("column_864"), ref("column_764")])]
-                scan
-                    projection ("test_space"."sysFrom"::unsigned -> "column_764", "test_space"."id"::unsigned -> "column_70", "test_space"."sys_op"::unsigned -> "column_864")
-                        group by ("test_space"."sys_op"::unsigned, "test_space"."sysFrom"::unsigned, "test_space"."id"::unsigned) output: ("test_space"."id"::unsigned -> "id", "test_space"."sysFrom"::unsigned -> "sysFrom", "test_space"."FIRST_NAME"::string -> "FIRST_NAME", "test_space"."sys_op"::unsigned -> "sys_op", "test_space"."bucket_id"::unsigned -> "bucket_id")
-                            scan "test_space"
+        r#"projection ("column_12"::unsigned -> "sysFrom", "column_13"::unsigned -> "sys_op", sum(distinct ("column_70"::decimal))::decimal -> "sum", count(distinct ("column_70"::integer))::integer -> "count")
+    having ROW("column_12"::unsigned, "column_13"::unsigned) in ROW($0, $0)
+        group by ("column_12"::unsigned, "column_13"::unsigned) output: ("column_12"::unsigned -> "column_12", "column_13"::unsigned -> "column_13", "column_70"::unsigned -> "column_70")
+            motion [policy: segment([ref("column_12"), ref("column_13")])]
+                projection ("test_space"."sysFrom"::unsigned -> "column_12", "test_space"."sys_op"::unsigned -> "column_13", "test_space"."id"::unsigned -> "column_70")
+                    group by ("test_space"."sysFrom"::unsigned, "test_space"."sys_op"::unsigned, "test_space"."id"::unsigned) output: ("test_space"."id"::unsigned -> "id", "test_space"."sysFrom"::unsigned -> "sysFrom", "test_space"."FIRST_NAME"::string -> "FIRST_NAME", "test_space"."sys_op"::unsigned -> "sys_op", "test_space"."bucket_id"::unsigned -> "bucket_id")
+                        scan "test_space"
 subquery $0:
-motion [policy: segment([ref("d"), ref("a")])]
+motion [policy: segment([ref("a"), ref("d")])]
             scan
                 projection ("t"."a"::unsigned -> "a", "t"."d"::unsigned -> "d")
                     scan "t"
@@ -2870,16 +2863,15 @@ fn front_sql_having_with_sq_segment_local_motion() {
     let expected_explain = String::from(
         r#"projection ("column_12"::unsigned -> "sysFrom", "column_13"::unsigned -> "sys_op", sum(distinct ("column_70"::decimal))::decimal -> "sum", count(distinct ("column_70"::integer))::integer -> "count")
     having ROW("column_12"::unsigned, "column_13"::unsigned) in ROW($0, $0)
-        group by ("column_13"::unsigned, "column_12"::unsigned) output: ("column_12"::unsigned -> "column_12", "column_70"::unsigned -> "column_70", "column_13"::unsigned -> "column_13")
-            motion [policy: segment([ref("column_13"), ref("column_12")])]
-                projection ("test_space"."sysFrom"::unsigned -> "column_12", "test_space"."id"::unsigned -> "column_70", "test_space"."sys_op"::unsigned -> "column_13")
-                    group by ("test_space"."sys_op"::unsigned, "test_space"."sysFrom"::unsigned, "test_space"."id"::unsigned) output: ("test_space"."id"::unsigned -> "id", "test_space"."sysFrom"::unsigned -> "sysFrom", "test_space"."FIRST_NAME"::string -> "FIRST_NAME", "test_space"."sys_op"::unsigned -> "sys_op", "test_space"."bucket_id"::unsigned -> "bucket_id")
+        group by ("column_12"::unsigned, "column_13"::unsigned) output: ("column_12"::unsigned -> "column_12", "column_13"::unsigned -> "column_13", "column_70"::unsigned -> "column_70")
+            motion [policy: segment([ref("column_12"), ref("column_13")])]
+                projection ("test_space"."sysFrom"::unsigned -> "column_12", "test_space"."sys_op"::unsigned -> "column_13", "test_space"."id"::unsigned -> "column_70")
+                    group by ("test_space"."sysFrom"::unsigned, "test_space"."sys_op"::unsigned, "test_space"."id"::unsigned) output: ("test_space"."id"::unsigned -> "id", "test_space"."sysFrom"::unsigned -> "sysFrom", "test_space"."FIRST_NAME"::string -> "FIRST_NAME", "test_space"."sys_op"::unsigned -> "sys_op", "test_space"."bucket_id"::unsigned -> "bucket_id")
                         scan "test_space"
 subquery $0:
-motion [policy: segment([ref("b"), ref("a")])]
-            scan
-                projection ("t"."a"::unsigned -> "a", "t"."b"::unsigned -> "b")
-                    scan "t"
+scan
+            projection ("t"."a"::unsigned -> "a", "t"."b"::unsigned -> "b")
+                scan "t"
 execution options:
 sql_vdbe_max_steps = 45000
 vtable_max_rows = 5000
@@ -2988,12 +2980,11 @@ fn front_sql_select_distinct() {
     // here we must compute only two groupby columns at local stage: a, b
     let expected_explain = String::from(
         r#"projection ("column_22"::unsigned -> "a", "column_27"::unsigned -> "col_1")
-    group by ("column_27"::unsigned, "column_22"::unsigned) output: ("column_22"::unsigned -> "column_22", "column_27"::unsigned -> "column_27")
-        motion [policy: segment([ref("column_27"), ref("column_22")])]
-            scan
-                projection ("t"."a"::unsigned -> "column_22", ROW("t"."a"::unsigned) + ROW("t"."b"::unsigned) -> "column_27")
-                    group by (ROW("t"."a"::unsigned) + ROW("t"."b"::unsigned), "t"."a"::unsigned) output: ("t"."a"::unsigned -> "a", "t"."b"::unsigned -> "b", "t"."c"::unsigned -> "c", "t"."d"::unsigned -> "d", "t"."bucket_id"::unsigned -> "bucket_id")
-                        scan "t"
+    group by ("column_22"::unsigned, "column_27"::unsigned) output: ("column_22"::unsigned -> "column_22", "column_27"::unsigned -> "column_27")
+        motion [policy: segment([ref("column_22"), ref("column_27")])]
+            projection ("t"."a"::unsigned -> "column_22", ROW("t"."a"::unsigned) + ROW("t"."b"::unsigned) -> "column_27")
+                group by ("t"."a"::unsigned, ROW("t"."a"::unsigned) + ROW("t"."b"::unsigned)) output: ("t"."a"::unsigned -> "a", "t"."b"::unsigned -> "b", "t"."c"::unsigned -> "c", "t"."d"::unsigned -> "d", "t"."bucket_id"::unsigned -> "bucket_id")
+                    scan "t"
 execution options:
 sql_vdbe_max_steps = 45000
 vtable_max_rows = 5000
@@ -3011,12 +3002,11 @@ fn front_sql_select_distinct_asterisk() {
     println!("{}", plan.as_explain().unwrap());
     let expected_explain = String::from(
         r#"projection ("column_23"::unsigned -> "a", "column_24"::unsigned -> "b", "column_25"::unsigned -> "c", "column_26"::unsigned -> "d")
-    group by ("column_24"::unsigned, "column_23"::unsigned, "column_25"::unsigned, "column_26"::unsigned) output: ("column_24"::unsigned -> "column_24", "column_23"::unsigned -> "column_23", "column_25"::unsigned -> "column_25", "column_26"::unsigned -> "column_26")
-        motion [policy: segment([ref("column_24"), ref("column_23"), ref("column_25"), ref("column_26")])]
-            scan
-                projection ("t"."b"::unsigned -> "column_24", "t"."a"::unsigned -> "column_23", "t"."c"::unsigned -> "column_25", "t"."d"::unsigned -> "column_26")
-                    group by ("t"."b"::unsigned, "t"."a"::unsigned, "t"."c"::unsigned, "t"."d"::unsigned) output: ("t"."a"::unsigned -> "a", "t"."b"::unsigned -> "b", "t"."c"::unsigned -> "c", "t"."d"::unsigned -> "d", "t"."bucket_id"::unsigned -> "bucket_id")
-                        scan "t"
+    group by ("column_23"::unsigned, "column_24"::unsigned, "column_25"::unsigned, "column_26"::unsigned) output: ("column_23"::unsigned -> "column_23", "column_24"::unsigned -> "column_24", "column_25"::unsigned -> "column_25", "column_26"::unsigned -> "column_26")
+        motion [policy: segment([ref("column_23"), ref("column_24"), ref("column_25"), ref("column_26")])]
+            projection ("t"."a"::unsigned -> "column_23", "t"."b"::unsigned -> "column_24", "t"."c"::unsigned -> "column_25", "t"."d"::unsigned -> "column_26")
+                group by ("t"."a"::unsigned, "t"."b"::unsigned, "t"."c"::unsigned, "t"."d"::unsigned) output: ("t"."a"::unsigned -> "a", "t"."b"::unsigned -> "b", "t"."c"::unsigned -> "c", "t"."d"::unsigned -> "d", "t"."bucket_id"::unsigned -> "bucket_id")
+                    scan "t"
 execution options:
 sql_vdbe_max_steps = 45000
 vtable_max_rows = 5000
diff --git a/sbroad-core/src/ir/transformation/redistribution/groupby.rs b/sbroad-core/src/ir/transformation/redistribution/groupby.rs
index 1d7a5ec604c06f8c36ca4dadd90ad8579173a6c9..37f04a3ec38153ce48e2b6e393bacc88233b8e32 100644
--- a/sbroad-core/src/ir/transformation/redistribution/groupby.rs
+++ b/sbroad-core/src/ir/transformation/redistribution/groupby.rs
@@ -23,7 +23,7 @@ use std::collections::{HashMap, HashSet};
 
 use crate::ir::function::{Behavior, Function};
 use crate::ir::helpers::RepeatableState;
-use std::hash::{Hash, Hasher};
+use std::hash::{BuildHasher, Hash, Hasher, RandomState};
 use std::rc::Rc;
 
 const AGGR_CAPACITY: usize = 10;
@@ -134,7 +134,7 @@ impl<'plan, 'args> PartialEq<Self> for AggregateSignature<'plan, 'args> {
 
 impl<'plan, 'args> Eq for AggregateSignature<'plan, 'args> {}
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 struct GroupingExpression<'plan> {
     pub id: NodeId,
     pub plan: &'plan Plan,
@@ -353,6 +353,75 @@ impl<'plan> ExpressionMapper<'plan> {
     }
 }
 
+struct OrderedMap<K, V, S = RandomState> {
+    map: HashMap<K, V, S>,
+    order: Vec<(K, V)>,
+}
+
+impl<K: Clone + Hash + Eq, V: Clone, S: BuildHasher> OrderedMap<K, V, S> {
+    fn with_hasher(hasher: S) -> Self {
+        Self {
+            map: HashMap::<K, V, S>::with_hasher(hasher),
+            order: Vec::<(K, V)>::new(),
+        }
+    }
+
+    fn with_capacity_and_hasher(capacity: usize, hasher: S) -> Self {
+        Self {
+            map: HashMap::<K, V, S>::with_capacity_and_hasher(capacity, hasher),
+            order: Vec::<(K, V)>::new(),
+        }
+    }
+
+    fn len(&self) -> usize {
+        self.map.len()
+    }
+
+    fn get(&self, key: &K) -> Option<&V> {
+        self.map.get(key)
+    }
+
+    fn remove(&mut self, key: &K) -> Option<V> {
+        self.order.retain(|(k, _)| k != key);
+        self.map.remove(key)
+    }
+
+    fn insert(&mut self, key: K, value: V) {
+        if self.map.insert(key.clone(), value.clone()).is_none() {
+            self.order.push((key, value));
+        }
+    }
+
+    fn get_ordered(&self) -> &Vec<(K, V)> {
+        &self.order
+    }
+}
+
+struct OrderedSet<V, S = RandomState> {
+    set: OrderedMap<V, (), S>,
+}
+
+impl<V: Clone + Hash + Eq, S: BuildHasher> OrderedSet<V, S> {
+    fn with_capacity_and_hasher(capacity: usize, hasher: S) -> Self {
+        Self {
+            set: OrderedMap::<V, (), S>::with_capacity_and_hasher(capacity, hasher),
+        }
+    }
+
+    fn insert(&mut self, value: V) {
+        self.set.insert(value, ())
+    }
+
+    fn get_ordered(&self) -> Vec<V> {
+        self.set
+            .get_ordered()
+            .iter()
+            .map(|(v, _)| v)
+            .cloned()
+            .collect()
+    }
+}
+
 impl Plan {
     #[allow(unreachable_code)]
     fn generate_local_alias(id: NodeId) -> String {
@@ -894,15 +963,16 @@ impl Plan {
         if has_groupby {
             let old_gr_cols = self.get_grouping_cols(upper)?;
             // remove duplicate expressions
-            let mut unique_grouping_exprs: HashSet<GroupingExpression, _> =
-                HashSet::with_capacity_and_hasher(old_gr_cols.len(), RepeatableState);
+            let mut unique_grouping_exprs: OrderedSet<GroupingExpression, _> =
+                OrderedSet::with_capacity_and_hasher(old_gr_cols.len(), RepeatableState);
             for gr_expr in old_gr_cols {
                 unique_grouping_exprs.insert(GroupingExpression::new(*gr_expr, self));
             }
-            let grouping_exprs = unique_grouping_exprs
-                .into_iter()
-                .map(|x| x.id)
-                .collect::<Vec<NodeId>>();
+            let grouping_exprs: Vec<NodeId> = unique_grouping_exprs
+                .get_ordered()
+                .iter()
+                .map(|e| e.id)
+                .collect();
             grouping_expr.extend(grouping_exprs.iter());
             self.set_grouping_cols(upper, grouping_exprs)?;
 
@@ -1169,13 +1239,13 @@ impl Plan {
         grouping_exprs: &Vec<NodeId>,
         output_cols: &mut Vec<NodeId>,
     ) -> Result<(LocalAliasesMap, NodeId, Vec<usize>), SbroadError> {
-        let mut unique_grouping_exprs_for_local_stage: HashMap<
+        let mut unique_grouping_exprs_for_local_stage_full: OrderedMap<
             GroupingExpression,
             Rc<String>,
             RepeatableState,
-        > = HashMap::with_hasher(RepeatableState);
+        > = OrderedMap::with_hasher(RepeatableState);
         for gr_expr in grouping_exprs {
-            unique_grouping_exprs_for_local_stage.insert(
+            unique_grouping_exprs_for_local_stage_full.insert(
                 GroupingExpression::new(*gr_expr, self),
                 Rc::new(Self::generate_local_alias(*gr_expr)),
             );
@@ -1201,33 +1271,37 @@ impl Plan {
                 })?
             };
             let expr = GroupingExpression::new(argument, self);
-            if let Some(local_alias) = unique_grouping_exprs_for_local_stage.get(&expr) {
+            if let Some(local_alias) = unique_grouping_exprs_for_local_stage_full.get(&expr) {
                 info.aggr
                     .lagg_alias
                     .insert(info.aggr.kind, local_alias.clone());
             } else {
                 grouping_exprs_from_aggregates.push(argument);
                 let local_alias = Rc::new(Self::generate_local_alias(argument));
-                unique_grouping_exprs_for_local_stage.insert(expr, local_alias.clone());
+                unique_grouping_exprs_for_local_stage_full.insert(expr, local_alias.clone());
                 info.aggr.lagg_alias.insert(info.aggr.kind, local_alias);
             }
         }
 
         // Because of borrow checker we need to remove references to Plan from map
-        let mut unique_grouping_exprs_for_local_stage: HashMap<
+        let mut unique_grouping_exprs_for_local_stage: OrderedMap<
             NodeId,
             Rc<String>,
             RepeatableState,
-        > = unique_grouping_exprs_for_local_stage
-            .into_iter()
-            .map(|(e, s)| (e.id, s))
-            .collect();
+        > = OrderedMap::with_capacity_and_hasher(
+            unique_grouping_exprs_for_local_stage_full.len(),
+            RepeatableState,
+        );
+        for (gr_expr, name) in unique_grouping_exprs_for_local_stage_full.get_ordered() {
+            unique_grouping_exprs_for_local_stage.insert(gr_expr.id, name.clone())
+        }
 
         let mut alias_to_pos: HashMap<Rc<String>, usize> = HashMap::new();
         // add grouping expressions to local projection
-
-        for (pos, (gr_expr, local_alias)) in
-            unique_grouping_exprs_for_local_stage.iter().enumerate()
+        for (pos, (gr_expr, local_alias)) in unique_grouping_exprs_for_local_stage
+            .get_ordered()
+            .iter()
+            .enumerate()
         {
             let new_alias = self.nodes.add_alias(local_alias, *gr_expr)?;
             output_cols.push(new_alias);