diff --git a/sbroad-core/src/executor/bucket/tests.rs b/sbroad-core/src/executor/bucket/tests.rs index bd072ed9ee3bb1908ebde7764f2304ee037c5818..ec76634b2c3f4c738110e6a42aaadad3d92cb812 100644 --- a/sbroad-core/src/executor/bucket/tests.rs +++ b/sbroad-core/src/executor/bucket/tests.rs @@ -444,3 +444,16 @@ fn global_tbl_join6() { assert_eq!(Buckets::All, buckets); } + +#[test] +fn global_tbl_groupby() { + let query = r#"select "a", avg("b") from "global_t" group by "a" having sum("b") > 10"#; + + let coordinator = RouterRuntimeMock::new(); + let mut query = Query::new(&coordinator, query, vec![]).unwrap(); + let plan = query.exec_plan.get_ir_plan(); + let top = plan.get_top().unwrap(); + let buckets = query.bucket_discovery(top).unwrap(); + + assert_eq!(Buckets::Any, buckets); +} diff --git a/sbroad-core/src/frontend/sql/ir/tests/global.rs b/sbroad-core/src/frontend/sql/ir/tests/global.rs index 7bdd42c17818b9504c5bc8beaff44ab92884b894..5b2db027bea6f67a6dfd5f126bf5013c8ae64419 100644 --- a/sbroad-core/src/frontend/sql/ir/tests/global.rs +++ b/sbroad-core/src/frontend/sql/ir/tests/global.rs @@ -34,16 +34,6 @@ fn front_sql_check_global_tbl_support() { metadata, global_tbl_err!("Except"), ); - check_error( - r#"select sum("a") from "global_t""#, - metadata, - global_tbl_err!("Aggregate"), - ); - check_error( - r#"select "a" from "global_t" group by "a""#, - metadata, - global_tbl_err!("GroupBy"), - ); check_error( r#"insert into "global_t" values (1, 1)"#, metadata, @@ -67,7 +57,6 @@ fn front_sql_check_global_tbl_support() { fn check_error(input: &str, metadata: &RouterConfigurationMock, expected_err: &str) { let res = build(input, metadata); - if res.is_ok() {} let err = res.unwrap_err(); assert_eq!(true, err.to_string().contains(expected_err)); @@ -789,3 +778,122 @@ vtable_max_rows = 5000 ); assert_eq!(expected_explain, plan.as_explain().unwrap()); } + +#[test] +fn front_sql_global_aggregate1() { + let input = r#" + select sum("a") + avg("b" + "b") from "global_t" + "#; + + let plan = sql_to_optimized_ir(input, vec![]); + println!("{}", plan.as_explain().unwrap()); + + let expected_explain = String::from( + r#"projection (ROW(sum(("global_t"."a"::integer))::decimal) + ROW(avg((ROW("global_t"."b"::integer) + ROW("global_t"."b"::integer)))::decimal) -> "COL_1") + scan "global_t" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + +#[test] +fn front_sql_global_aggregate2() { + let input = r#" + select sum("a") + avg("b" + "b") from "global_t" + "#; + + let plan = sql_to_optimized_ir(input, vec![]); + println!("{}", plan.as_explain().unwrap()); + + let expected_explain = String::from( + r#"projection (ROW(sum(("global_t"."a"::integer))::decimal) + ROW(avg((ROW("global_t"."b"::integer) + ROW("global_t"."b"::integer)))::decimal) -> "COL_1") + scan "global_t" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + +#[test] +fn front_sql_global_aggregate3() { + let input = r#" + select "b"+"a", sum("a") from "global_t" + group by "b"+"a" + "#; + + let plan = sql_to_optimized_ir(input, vec![]); + println!("{}", plan.as_explain().unwrap()); + + let expected_explain = String::from( + r#"projection (ROW("global_t"."b"::integer) + ROW("global_t"."a"::integer) -> "COL_1", sum(("global_t"."a"::integer))::decimal -> "COL_2") + group by (ROW("global_t"."b"::integer) + ROW("global_t"."a"::integer)) output: ("global_t"."a"::integer -> "a", "global_t"."b"::integer -> "b") + scan "global_t" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + +#[test] +fn front_sql_global_aggregate4() { + let input = r#" + select "b"+"a", sum("a") from "global_t" + group by "b"+"a" + having avg("b") > 3 + "#; + + let plan = sql_to_optimized_ir(input, vec![]); + println!("{}", plan.as_explain().unwrap()); + + let expected_explain = String::from( + r#"projection (ROW("global_t"."b"::integer) + ROW("global_t"."a"::integer) -> "COL_1", sum(("global_t"."a"::integer))::decimal -> "COL_2") + having ROW(avg(("global_t"."b"::integer))::decimal) > ROW(3::unsigned) + group by (ROW("global_t"."b"::integer) + ROW("global_t"."a"::integer)) output: ("global_t"."a"::integer -> "a", "global_t"."b"::integer -> "b") + scan "global_t" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + +#[test] +fn front_sql_global_aggregate5() { + let input = r#" + select "b"+"a", sum("a") from "global_t" + where ("a", "b") in (select "e", "f" from "t2") + group by "b"+"a" + having avg("b") > 3 + "#; + + let plan = sql_to_optimized_ir(input, vec![]); + + let expected_explain = String::from( + r#"projection ("column_44"::integer -> "COL_1", sum(("sum_70"::decimal))::decimal -> "COL_2") + having ROW((sum(("sum_52"::decimal::double))::decimal / sum(("count_52"::integer::double))::decimal)) > ROW(3::unsigned) + group by ("column_44"::integer) output: ("column_44"::integer -> "column_44", "sum_52"::decimal -> "sum_52", "sum_70"::decimal -> "sum_70", "count_52"::integer -> "count_52") + motion [policy: segment([ref("column_44")])] + scan + projection (ROW("global_t"."b"::integer) + ROW("global_t"."a"::integer) -> "column_44", sum(("global_t"."b"::integer))::decimal -> "sum_52", sum(("global_t"."a"::integer))::decimal -> "sum_70", count(("global_t"."b"::integer))::integer -> "count_52") + group by (ROW("global_t"."b"::integer) + ROW("global_t"."a"::integer)) output: ("global_t"."a"::integer -> "a", "global_t"."b"::integer -> "b") + selection ROW("global_t"."a"::integer, "global_t"."b"::integer) in ROW($0, $0) + scan "global_t" +subquery $0: +scan + projection ("t2"."e"::unsigned -> "e", "t2"."f"::unsigned -> "f") + scan "t2" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} diff --git a/sbroad-core/src/ir/distribution.rs b/sbroad-core/src/ir/distribution.rs index 26eca46d5544291c31b559baf2569dc6bccc3d0f..76b80439418f1eb2c7764bc2dd8d8468c1610fd5 100644 --- a/sbroad-core/src/ir/distribution.rs +++ b/sbroad-core/src/ir/distribution.rs @@ -155,12 +155,7 @@ impl Distribution { } /// Calculate a new distribution for the tuple combined from two different tuples. - fn join( - left: &Distribution, - right: &Distribution, - plan: &Plan, - join_id: usize, - ) -> Result<Distribution, SbroadError> { + fn join(left: &Distribution, right: &Distribution) -> Result<Distribution, SbroadError> { let dist = match (left, right) { (Distribution::Any, Distribution::Any) => Distribution::Any, (Distribution::Single, Distribution::Global | Distribution::Single) @@ -689,9 +684,7 @@ impl Plan { Relational::Except { .. } | Relational::UnionAll { .. } => { Distribution::union_except(&left_dist, &right_dist)? } - Relational::Join { .. } => { - Distribution::join(&left_dist, &right_dist, self, parent_id)? - } + Relational::Join { .. } => Distribution::join(&left_dist, &right_dist)?, _ => { return Err(SbroadError::Invalid( Entity::Relational, diff --git a/sbroad-core/src/ir/transformation/redistribution.rs b/sbroad-core/src/ir/transformation/redistribution.rs index f95bd944eafff91eb88f8932d82ba9edc931b528..b40c83546a09ff1c6163bd9cc4abfe8e4d9e7f67 100644 --- a/sbroad-core/src/ir/transformation/redistribution.rs +++ b/sbroad-core/src/ir/transformation/redistribution.rs @@ -973,7 +973,7 @@ impl Plan { pub(crate) fn check_global_tbl_support(&self, rel_id: usize) -> Result<(), SbroadError> { let node = self.get_relation_node(rel_id)?; match node { - Relational::Values { .. } | Relational::GroupBy { .. } | Relational::Having { .. } => { + Relational::Values { .. } => { let child_dist = self.get_distribution( self.get_relational_output(self.get_relational_child(rel_id, 0)?)?, )?; @@ -1033,6 +1033,8 @@ impl Plan { } } Relational::Projection { .. } + | Relational::GroupBy { .. } + | Relational::Having { .. } | Relational::ScanRelation { .. } | Relational::Selection { .. } | Relational::ValuesRow { .. } @@ -1956,10 +1958,11 @@ impl Plan { let child_dist = self.get_distribution( self.get_relational_output(self.get_relational_child(id, 0)?)?, )?; - if matches!(child_dist, Distribution::Single) { - // If child has Distribution::Single and this Projection contains - // aggregates or there is GroupBy, then we don't need two stage - // transformation, we can calculate aggregates / GroupBy in one + if matches!(child_dist, Distribution::Single | Distribution::Global) { + // If child has Single or Global distribution and this Projection + // contains aggregates or there is GroupBy, + // then we don't need two stage transformation, + // we can calculate aggregates / GroupBy in one // stage, because all data will reside on a single node. self.set_dist(proj_output_id, child_dist.clone())?; } else if !self.add_two_stage_aggregation(id)? { diff --git a/sbroad-core/src/ir/transformation/redistribution/groupby.rs b/sbroad-core/src/ir/transformation/redistribution/groupby.rs index f54391604c49c7574361d4b80559fcf2b55de955..db2a762ac1542850d4b0c52a4c479b05218495b9 100644 --- a/sbroad-core/src/ir/transformation/redistribution/groupby.rs +++ b/sbroad-core/src/ir/transformation/redistribution/groupby.rs @@ -1624,15 +1624,6 @@ impl Plan { let (finals, upper) = self.split_reduce_stage(final_proj_id)?; let mut aggr_infos = self.collect_aggregates(&finals)?; let has_aggregates = !aggr_infos.is_empty(); - if has_aggregates { - if let Distribution::Global = - self.get_distribution(self.get_relational_output(upper)?)? - { - return Err(SbroadError::UnsupportedOpForGlobalTables( - "Aggregate".to_string(), - )); - } - } let (upper, grouping_exprs, gr_expr_map) = self.collect_grouping_expressions(upper, &finals, has_aggregates)?; if grouping_exprs.is_empty() && aggr_infos.is_empty() {