diff --git a/sbroad-core/src/backend/sql/tree/tests.rs b/sbroad-core/src/backend/sql/tree/tests.rs index bd765aefd4a822997194548e8355c0c4cc26e59d..563f68ee12c90c3dfdbf28f01d2d9bf3a9a79a2c 100644 --- a/sbroad-core/src/backend/sql/tree/tests.rs +++ b/sbroad-core/src/backend/sql/tree/tests.rs @@ -36,7 +36,7 @@ fn sql_order_selection() { let eq_id = plan.nodes.add_bool(a_id, Bool::Eq, const_row).unwrap(); let select_id = plan.add_select(&[scan_id], eq_id).unwrap(); - let proj_id = plan.add_proj(select_id, &["a"], false).unwrap(); + let proj_id = plan.add_proj(select_id, &["a"], false, false).unwrap(); plan.set_top(proj_id).unwrap(); // check the plan diff --git a/sbroad-core/src/frontend/sql/ir/tests.rs b/sbroad-core/src/frontend/sql/ir/tests.rs index 920aa55714943d0114ae24eb0c5a11350810b1b1..204f4e8dd31ea1e30c82861d036b9e3530cb2cdd 100644 --- a/sbroad-core/src/frontend/sql/ir/tests.rs +++ b/sbroad-core/src/frontend/sql/ir/tests.rs @@ -517,6 +517,65 @@ vtable_max_rows = 5000 assert_eq!(expected_explain, plan.as_explain().unwrap()); } +#[test] +fn front_sql_join_on_bucket_id1() { + let input = r#"select * from "t2" join ( + select "bucket_id" from "test_space" where "id" = 1 + ) as t_mv + on t_mv."bucket_id" = "t2"."bucket_id"; + "#; + + let plan = sql_to_optimized_ir(input, vec![]); + + let expected_explain = String::from( + r#"projection ("t2"."e"::unsigned -> "e", "t2"."f"::unsigned -> "f", "t2"."g"::unsigned -> "g", "t2"."h"::unsigned -> "h") + join on ROW("T_MV"."bucket_id"::unsigned) = ROW("t2"."bucket_id"::unsigned) + scan "t2" + projection ("t2"."e"::unsigned -> "e", "t2"."f"::unsigned -> "f", "t2"."g"::unsigned -> "g", "t2"."h"::unsigned -> "h", "t2"."bucket_id"::unsigned -> "bucket_id") + scan "t2" + scan "T_MV" + projection ("test_space"."bucket_id"::unsigned -> "bucket_id") + selection ROW("test_space"."id"::unsigned) = ROW(1::unsigned) + scan "test_space" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + +#[test] +fn front_sql_join_on_bucket_id2() { + let input = r#"select * from "t2" join ( + select "bucket_id" from "test_space" where "id" = 1 + ) as t_mv + on t_mv."bucket_id" = "t2"."bucket_id" or "t2"."e" = "t2"."f"; + "#; + + let plan = sql_to_optimized_ir(input, vec![]); + + let expected_explain = String::from( + r#"projection ("t2"."e"::unsigned -> "e", "t2"."f"::unsigned -> "f", "t2"."g"::unsigned -> "g", "t2"."h"::unsigned -> "h") + join on ROW("T_MV"."bucket_id"::unsigned) = ROW("t2"."bucket_id"::unsigned) or ROW("t2"."e"::unsigned) = ROW("t2"."f"::unsigned) + scan "t2" + projection ("t2"."e"::unsigned -> "e", "t2"."f"::unsigned -> "f", "t2"."g"::unsigned -> "g", "t2"."h"::unsigned -> "h", "t2"."bucket_id"::unsigned -> "bucket_id") + scan "t2" + motion [policy: full] + scan "T_MV" + projection ("test_space"."bucket_id"::unsigned -> "bucket_id") + selection ROW("test_space"."id"::unsigned) = ROW(1::unsigned) + scan "test_space" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + #[test] fn front_sql_exists_subquery_select_from_table() { let input = r#"SELECT "id" FROM "test_space" WHERE EXISTS (SELECT 0 FROM "hash_testing")"#; diff --git a/sbroad-core/src/ir/distribution/tests.rs b/sbroad-core/src/ir/distribution/tests.rs index 986f485af0b1843e147e4e7e93e4794ec0423858..60f153f4e07ef457a24aa18a4609cb9ee18ec609 100644 --- a/sbroad-core/src/ir/distribution/tests.rs +++ b/sbroad-core/src/ir/distribution/tests.rs @@ -28,7 +28,7 @@ fn proj_preserve_dist_key() { plan.add_rel(t); let scan_id = plan.add_scan("t", None).unwrap(); - let proj_id = plan.add_proj(scan_id, &["a", "b"], false).unwrap(); + let proj_id = plan.add_proj(scan_id, &["a", "b"], false, false).unwrap(); plan.top = Some(proj_id); diff --git a/sbroad-core/src/ir/expression/tests.rs b/sbroad-core/src/ir/expression/tests.rs index 9e39f5036a868d78a2c1b91990e54619d8e1c62f..985a37d39eace4c0fec935e1521a44c01dc5a62e 100644 --- a/sbroad-core/src/ir/expression/tests.rs +++ b/sbroad-core/src/ir/expression/tests.rs @@ -55,7 +55,7 @@ fn rel_nodes_from_reference_in_proj() { .unwrap(); plan.add_rel(t); let scan_id = plan.add_scan("t", None).unwrap(); - let proj_id = plan.add_proj(scan_id, &["a"], false).unwrap(); + let proj_id = plan.add_proj(scan_id, &["a"], false, false).unwrap(); let output = plan.get_relational_output(proj_id).unwrap(); let rel_set = plan.get_relational_nodes_from_row(output).unwrap(); diff --git a/sbroad-core/src/ir/operator.rs b/sbroad-core/src/ir/operator.rs index aefd905c1503fa3e1ecd7041d1351e83110828d0..e2ea841c5a25abc2d9aada90a070b0ada12171fc 100644 --- a/sbroad-core/src/ir/operator.rs +++ b/sbroad-core/src/ir/operator.rs @@ -1255,16 +1255,7 @@ impl Plan { // We'll need it later to update the condition expression (borrow checker). let table = self.get_relation_or_error(relation)?; let sharding_column_pos = table.get_bucket_id_position()?; - - // Wrap relation with sub-query scan. - let scan_name = if let Some(alias_name) = alias { - alias_name.clone() - } else { - relation.clone() - }; - let proj_id = self.add_proj(*child, &[], false)?; - let sq_id = self.add_sub_query(proj_id, Some(&scan_name))?; - children.push(sq_id); + let mut needs_bucket_id_column = false; // Update references to the sub-query's output in the condition. let mut condition_tree = BreadthFirst::with_capacity( @@ -1276,13 +1267,31 @@ impl Plan { .iter(condition) .filter_map(|(_, id)| { let expr = self.get_expression_node(id).ok(); - if let Some(Expression::Reference { .. }) = expr { + if let Some(Expression::Reference { position, .. }) = expr { + if Some(*position) == sharding_column_pos { + needs_bucket_id_column = true; + } Some(id) } else { None } }) .collect::<Vec<_>>(); + + // Wrap relation with sub-query scan. + let scan_name = if let Some(alias_name) = alias { + alias_name.clone() + } else { + relation.clone() + }; + let proj_id = self.add_proj(*child, &[], false, needs_bucket_id_column)?; + let sq_id = self.add_sub_query(proj_id, Some(&scan_name))?; + children.push(sq_id); + + if needs_bucket_id_column { + continue; + } + // we should update ONLY references that refer to current child (left, right) let current_target = match join_child { JoinChild::Inner => Some(vec![1_usize]), @@ -1396,8 +1405,9 @@ impl Plan { child: usize, col_names: &[&str], is_distinct: bool, + needs_shard_col: bool, ) -> Result<usize, SbroadError> { - let output = self.add_row_for_output(child, col_names, false)?; + let output = self.add_row_for_output(child, col_names, needs_shard_col)?; let proj = Relational::Projection { children: vec![child], output, diff --git a/sbroad-core/src/ir/operator/tests.rs b/sbroad-core/src/ir/operator/tests.rs index 99f0191f32c0d536f6c97100df0b46aebae979bd..d9b002d148c4a13f01ad4f5ae575f735751b51be 100644 --- a/sbroad-core/src/ir/operator/tests.rs +++ b/sbroad-core/src/ir/operator/tests.rs @@ -112,7 +112,8 @@ fn projection() { // Invalid alias names in the output assert_eq!( SbroadError::NotFound(Entity::Column, r#"with name e"#.into()), - plan.add_proj(scan_id, &["a", "e"], false).unwrap_err() + plan.add_proj(scan_id, &["a", "e"], false, false) + .unwrap_err() ); // Expression node instead of relational one @@ -121,13 +122,13 @@ fn projection() { Entity::Node, Some("node is not Relational type: Expression(Alias { name: \"a\", child: 0 })".into()) ), - plan.add_proj(1, &["a"], false).unwrap_err() + plan.add_proj(1, &["a"], false, false).unwrap_err() ); // Try to build projection from the non-existing node assert_eq!( SbroadError::NotFound(Entity::Node, "from arena with index 42".to_string()), - plan.add_proj(42, &["a"], false).unwrap_err() + plan.add_proj(42, &["a"], false, false).unwrap_err() ); } @@ -467,7 +468,7 @@ fn selection_with_sub_query() { .unwrap(); plan.add_rel(t2); let scan_t2_id = plan.add_scan("t2", None).unwrap(); - let proj_id = plan.add_proj(scan_t2_id, &["b"], false).unwrap(); + let proj_id = plan.add_proj(scan_t2_id, &["b"], false, false).unwrap(); let sq_id = plan.add_sub_query(proj_id, None).unwrap(); children.push(sq_id); diff --git a/sbroad-core/src/ir/relation.rs b/sbroad-core/src/ir/relation.rs index 9ae2e0836cc0931a04709e0b137c67e25923e3da..a834b123e8041a001c9ba7871f5f1d9a0acbcc0e 100644 --- a/sbroad-core/src/ir/relation.rs +++ b/sbroad-core/src/ir/relation.rs @@ -27,6 +27,8 @@ use super::distribution::Key; const DEFAULT_VALUE: Value = Value::Null; +pub const SHARD_COL_NAME: &str = "\"bucket_id\""; + /// Supported column types, which is used in a schema only. /// This `Type` is derived from the result's metadata. #[derive(Serialize, Default, Deserialize, PartialEq, Hash, Debug, Eq, Clone)] diff --git a/sbroad-core/src/ir/transformation/redistribution.rs b/sbroad-core/src/ir/transformation/redistribution.rs index b60adcb2c82ac2622b7ec92137caff9b99785417..15a63c3bdf53d65cae04580176d6480dc1823156 100644 --- a/sbroad-core/src/ir/transformation/redistribution.rs +++ b/sbroad-core/src/ir/transformation/redistribution.rs @@ -12,7 +12,7 @@ use crate::ir::expression::ColumnPositionMap; use crate::ir::expression::Expression; use crate::ir::operator::{Bool, JoinKind, Relational, Unary, UpdateStrategy}; -use crate::ir::relation::TableKind; +use crate::ir::relation::{TableKind, SHARD_COL_NAME}; use crate::ir::transformation::redistribution::eq_cols::EqualityCols; use crate::ir::tree::traversal::{ BreadthFirst, LevelNode, PostOrder, PostOrderWithFilter, EXPR_CAPACITY, REL_CAPACITY, @@ -27,6 +27,7 @@ pub(crate) mod eq_cols; pub(crate) mod groupby; pub(crate) mod left_join; +#[derive(Debug)] pub(crate) enum JoinChild { Inner, Outer, @@ -912,6 +913,37 @@ impl Plan { left_row_id: usize, right_row_id: usize, ) -> Result<MotionPolicy, SbroadError> { + { + // check for (a, t1.bucket_id, b) = (x, t2.bucket_id, y) + let get_shard_pos = |row_id: usize| -> Result<Option<usize>, SbroadError> { + let mut shard_pos = None; + let refs = self.get_row_list(row_id)?; + for (pos, ref_id) in refs.iter().enumerate() { + let node @ Expression::Reference { .. } = self.get_expression_node(*ref_id)? + else { + continue; + }; + + // NB: This code assumes that user does not shoot himself in + // the leg by renaming some column into `bucket_id` like here: + // select * from (select "a" as "bucket_id", "bucket_id" as b from "t") join t2 on ... + // If this happens, we will get wrong plan. + // TODO: forbid renaming some column into `bucket_id` or renaming + // `bucket_id` into something else. + if SHARD_COL_NAME == self.get_alias_from_reference_node(node)? { + shard_pos = Some(pos); + break; + } + } + Ok(shard_pos) + }; + let left_shard_pos = get_shard_pos(left_row_id)?; + let right_shard_pos = get_shard_pos(right_row_id)?; + if left_shard_pos.is_some() && left_shard_pos == right_shard_pos { + return Ok(MotionPolicy::None); + } + } + let left_dist = self.get_distribution(left_row_id)?; let right_dist = self.get_distribution(right_row_id)?; let row_map_left = self.build_row_map(left_row_id)?; diff --git a/sbroad-core/src/ir/transformation/redistribution/left_join.rs b/sbroad-core/src/ir/transformation/redistribution/left_join.rs index a3d6f4819ac965b06f56bb95856a204c13e4756f..887e32efe7942af01fe589b31cf825114b466d25 100644 --- a/sbroad-core/src/ir/transformation/redistribution/left_join.rs +++ b/sbroad-core/src/ir/transformation/redistribution/left_join.rs @@ -68,7 +68,7 @@ impl Plan { fn create_projection(plan: &mut Plan, join_id: usize) -> Result<usize, SbroadError> { let proj_columns_names = collect_projection_columns(plan, join_id)?; let proj_columns_refs: Vec<&str> = proj_columns_names.iter().map(String::as_str).collect(); - let proj_id = plan.add_proj(join_id, &proj_columns_refs, false)?; + let proj_id = plan.add_proj(join_id, &proj_columns_refs, false, false)?; let output_id = plan.get_relational_output(proj_id)?; plan.replace_parent_in_subtree(output_id, Some(join_id), Some(proj_id))?; plan.set_distribution(output_id)?; diff --git a/sbroad-core/src/ir/transformation/redistribution/tests.rs b/sbroad-core/src/ir/transformation/redistribution/tests.rs index 8bab97481297c2b4087d66e5e25186cf3840a55c..ef5a34582ec2e1f49f739bf17721b9fdfec7cae0 100644 --- a/sbroad-core/src/ir/transformation/redistribution/tests.rs +++ b/sbroad-core/src/ir/transformation/redistribution/tests.rs @@ -42,7 +42,7 @@ fn full_motion_less_for_sub_query() { .unwrap(); plan.add_rel(t2); let scan_t2_id = plan.add_scan("t2", None).unwrap(); - let proj_id = plan.add_proj(scan_t2_id, &["b"], false).unwrap(); + let proj_id = plan.add_proj(scan_t2_id, &["b"], false, false).unwrap(); let sq_id = plan.add_sub_query(proj_id, None).unwrap(); children.push(sq_id); @@ -105,7 +105,7 @@ fn full_motion_non_segment_outer_for_sub_query() { .unwrap(); plan.add_rel(t2); let scan_t2_id = plan.add_scan("t2", None).unwrap(); - let proj_id = plan.add_proj(scan_t2_id, &["a"], false).unwrap(); + let proj_id = plan.add_proj(scan_t2_id, &["a"], false, false).unwrap(); let sq_id = plan.add_sub_query(proj_id, None).unwrap(); children.push(sq_id); @@ -165,7 +165,7 @@ fn local_sub_query() { .unwrap(); plan.add_rel(t2); let scan_t2_id = plan.add_scan("t2", None).unwrap(); - let proj_id = plan.add_proj(scan_t2_id, &["a"], false).unwrap(); + let proj_id = plan.add_proj(scan_t2_id, &["a"], false, false).unwrap(); let sq_id = plan.add_sub_query(proj_id, None).unwrap(); children.push(sq_id); @@ -227,13 +227,13 @@ fn multiple_sub_queries() { .unwrap(); plan.add_rel(t2); let sq1_scan_t2_id = plan.add_scan("t2", None).unwrap(); - let sq1_proj_id = plan.add_proj(sq1_scan_t2_id, &["a"], false).unwrap(); + let sq1_proj_id = plan.add_proj(sq1_scan_t2_id, &["a"], false, false).unwrap(); let sq1_id = plan.add_sub_query(sq1_proj_id, None).unwrap(); children.push(sq1_id); let sq1_pos = children.len() - 1; let sq2_scan_t2_id = plan.add_scan("t2", None).unwrap(); - let sq2_proj_id = plan.add_proj(sq2_scan_t2_id, &["b"], false).unwrap(); + let sq2_proj_id = plan.add_proj(sq2_scan_t2_id, &["b"], false, false).unwrap(); let sq2_id = plan.add_sub_query(sq2_proj_id, None).unwrap(); children.push(sq2_id); let sq2_pos = children.len() - 1; diff --git a/sbroad-core/src/ir/transformation/redistribution/tests/segment.rs b/sbroad-core/src/ir/transformation/redistribution/tests/segment.rs index 70deabb0f0f5437036fb1973239001e3acdc1c57..5e1a5427e58546abdccd1bc16d98b5578372b76f 100644 --- a/sbroad-core/src/ir/transformation/redistribution/tests/segment.rs +++ b/sbroad-core/src/ir/transformation/redistribution/tests/segment.rs @@ -48,7 +48,7 @@ fn sub_query1() { .unwrap(); plan.add_rel(t2); let scan_t2_id = plan.add_scan("t2", None).unwrap(); - let proj_id = plan.add_proj(scan_t2_id, &["b"], false).unwrap(); + let proj_id = plan.add_proj(scan_t2_id, &["b"], false, false).unwrap(); let sq_id = plan.add_sub_query(proj_id, None).unwrap(); children.push(sq_id); diff --git a/sbroad-core/src/ir/tree/tests.rs b/sbroad-core/src/ir/tree/tests.rs index db7abb0c936b1e7541eb59a8015615ac0a9a96d8..b0a0f9f1cc2b374f62779607b45ee19633b94bb3 100644 --- a/sbroad-core/src/ir/tree/tests.rs +++ b/sbroad-core/src/ir/tree/tests.rs @@ -148,7 +148,9 @@ fn selection_subquery_dfs_post() { let const1 = plan.add_const(Value::from(1_u64)); let eq_op = plan.nodes.add_bool(b, Bool::Eq, const1).unwrap(); let selection_t2_id = plan.add_select(&[scan_t2_id], eq_op).unwrap(); - let proj_id = plan.add_proj(selection_t2_id, &["c"], false).unwrap(); + let proj_id = plan + .add_proj(selection_t2_id, &["c"], false, false) + .unwrap(); let sq_id = plan.add_sub_query(proj_id, None).unwrap(); let c = plan.get_row_from_rel_node(sq_id).unwrap(); @@ -223,7 +225,9 @@ fn subtree_dfs_post() { let const1 = plan.add_const(Value::from(1_i64)); let eq_op = plan.nodes.add_bool(a, Bool::Eq, const1).unwrap(); let selection_t1_id = plan.add_select(&[scan_t1_id], eq_op).unwrap(); - let proj_id = plan.add_proj(selection_t1_id, &["c"], false).unwrap(); + let proj_id = plan + .add_proj(selection_t1_id, &["c"], false, false) + .unwrap(); plan.set_top(proj_id).unwrap(); let top = plan.get_top().unwrap();