From 73576c4bd9dd047016fbcf287e83452aad8b3416 Mon Sep 17 00:00:00 2001 From: Arseniy Volynets <vol0ncar@yandex.ru> Date: Fri, 22 Dec 2023 13:15:20 +0300 Subject: [PATCH] feat: support except for global tbls --- sbroad-core/src/backend/sql/ir.rs | 1 + sbroad-core/src/backend/sql/tree.rs | 10 +- sbroad-core/src/executor.rs | 28 +- sbroad-core/src/executor/bucket.rs | 35 ++ sbroad-core/src/executor/ir.rs | 1 + sbroad-core/src/executor/tests/exec_plan.rs | 72 +++- sbroad-core/src/frontend/sql/ir.rs | 343 ++++++++++++++++++ .../src/frontend/sql/ir/tests/global.rs | 247 ++++++++++++- sbroad-core/src/ir.rs | 3 +- sbroad-core/src/ir/api/parameter.rs | 2 +- sbroad-core/src/ir/explain.rs | 13 + sbroad-core/src/ir/helpers.rs | 3 + sbroad-core/src/ir/operator.rs | 18 + .../src/ir/transformation/redistribution.rs | 270 +++++++------- sbroad-core/src/ir/tree/relation.rs | 1 + sbroad-core/src/ir/tree/subtree.rs | 9 +- sbroad-core/src/ir/tree/tests.rs | 3 +- 17 files changed, 898 insertions(+), 161 deletions(-) diff --git a/sbroad-core/src/backend/sql/ir.rs b/sbroad-core/src/backend/sql/ir.rs index 7719b799c2..9c9adc24e5 100644 --- a/sbroad-core/src/backend/sql/ir.rs +++ b/sbroad-core/src/backend/sql/ir.rs @@ -311,6 +311,7 @@ impl ExecutionPlan { Node::Relational(rel) => match rel { Relational::Except { .. } => sql.push_str("EXCEPT"), Relational::GroupBy { .. } => sql.push_str("GROUP BY"), + Relational::Intersect { .. } => sql.push_str("INTERSECT"), Relational::Having { .. } => sql.push_str("HAVING"), Relational::Delete { .. } => { return Err(SbroadError::Unsupported( diff --git a/sbroad-core/src/backend/sql/tree.rs b/sbroad-core/src/backend/sql/tree.rs index 8c32f9caa6..e1c609029d 100644 --- a/sbroad-core/src/backend/sql/tree.rs +++ b/sbroad-core/src/backend/sql/tree.rs @@ -785,16 +785,18 @@ impl<'p> SyntaxPlan<'p> { ); Ok(self.nodes.push_syntax_node(sn)) } - Relational::Except { children, .. } | Relational::UnionAll { children, .. } => { + Relational::Except { children, .. } + | Relational::UnionAll { children, .. } + | Relational::Intersect { children, .. } => { let left_id = *children.first().ok_or_else(|| { SbroadError::UnexpectedNumberOfValues( - "Union/Except has no children.".into(), + "Union/Except/Intersect has no children.".into(), ) })?; let right_id = *children.get(1).ok_or_else(|| { SbroadError::NotFound( Entity::Node, - "that is Union/Except right child.".into(), + "that is Union/Except/Intersect right child.".into(), ) })?; let sn = SyntaxNode::new_pointer( @@ -1311,7 +1313,7 @@ impl<'p> SyntaxPlan<'p> { match snapshot { Snapshot::Latest => { let mut dft_post = - PostOrder::with_capacity(|node| ir_plan.subtree_iter(node), capacity); + PostOrder::with_capacity(|node| ir_plan.subtree_iter(node, false), capacity); for (_, id) in dft_post.iter(top) { // it works only for post-order traversal let sn_id = sp.add_plan_node(id)?; diff --git a/sbroad-core/src/executor.rs b/sbroad-core/src/executor.rs index a264cc0a0b..450a80bcda 100644 --- a/sbroad-core/src/executor.rs +++ b/sbroad-core/src/executor.rs @@ -25,7 +25,6 @@ use std::any::Any; use std::collections::HashMap; -use std::rc::Rc; use crate::errors::{Action, Entity, SbroadError}; use crate::executor::bucket::Buckets; @@ -39,7 +38,6 @@ use crate::ir::transformation::redistribution::MotionPolicy; use crate::ir::value::Value; use crate::ir::Plan; use crate::otm::{child_span, query_id}; -use ahash::AHashMap; use sbroad_proc::otm_child_span; pub mod bucket; @@ -192,35 +190,21 @@ where if self.is_explain() { return self.coordinator.explain_format(self.to_explain()?); } - self.get_mut_exec_plan() .get_mut_ir_plan() .restore_constants()?; let slices = self.exec_plan.get_ir_plan().clone_slices(); - let mut already_materialized: AHashMap<usize, usize> = - AHashMap::with_capacity(slices.slices().len()); for slice in slices.slices() { // TODO: make it work in parallel for motion_id in slice.positions() { - let top_id = self.exec_plan.get_motion_subtree_root(*motion_id)?; - - // Multiple motions can point to the same subtree (BETWEEN operator). - if let Some(id) = already_materialized.get(&top_id) { - let vtable = self.get_exec_plan().get_motion_vtable(*id)?; - if let Some(vtables) = self.get_mut_exec_plan().get_mut_vtables() { - vtables.insert(*motion_id, Rc::clone(&vtable)); + if let Some(vtables_map) = self.exec_plan.get_vtables() { + if vtables_map.contains_key(motion_id) { + continue; } - // Unlink the subtree under the motion node - // (it has already been materialized and replaced with invalid nodes). - // Such a logic is applied in `materialize_motion`, but only for the - // first `Motion` (that e.g. pointed to BETWEEN). - // We have to apply it again, but for current `motion_id` now. - self.get_mut_exec_plan().unlink_motion_subtree(*motion_id)?; - continue; } - let motion = self.exec_plan.get_ir_plan().get_relation_node(*motion_id)?; + if let Relational::Motion { policy, .. } = motion { match policy { // Local segment motions should be treated as a special case. @@ -241,7 +225,6 @@ where &self.coordinator, )?; self.get_mut_exec_plan().unlink_motion_subtree(*motion_id)?; - already_materialized.insert(top_id, *motion_id); } continue; } @@ -252,6 +235,7 @@ where } } + let top_id = self.exec_plan.get_motion_subtree_root(*motion_id)?; let buckets = self.bucket_discovery(top_id)?; let virtual_table = self.coordinator.materialize_motion( &mut self.exec_plan, @@ -260,10 +244,8 @@ where )?; self.exec_plan .set_motion_vtable(*motion_id, virtual_table, &self.coordinator)?; - already_materialized.insert(top_id, *motion_id); } } - let top_id = self.exec_plan.get_ir_plan().get_top()?; let buckets = self.bucket_discovery(top_id)?; self.coordinator diff --git a/sbroad-core/src/executor/bucket.rs b/sbroad-core/src/executor/bucket.rs index 6aa51bd974..a6a71eff90 100644 --- a/sbroad-core/src/executor/bucket.rs +++ b/sbroad-core/src/executor/bucket.rs @@ -495,6 +495,41 @@ where )); } } + Relational::Intersect { + children, output, .. + } => { + if let (Some(first_id), Some(second_id), None) = + (children.first(), children.get(1), children.get(2)) + { + let first_rel = + self.exec_plan.get_ir_plan().get_relation_node(*first_id)?; + let second_rel = + self.exec_plan.get_ir_plan().get_relation_node(*second_id)?; + let first_buckets = + self.bucket_map.get(&first_rel.output()).ok_or_else(|| { + SbroadError::FailedTo( + Action::Retrieve, + Some(Entity::Buckets), + "of the first intersect child from the bucket map.".to_string(), + ) + })?; + let second_buckets = + self.bucket_map.get(&second_rel.output()).ok_or_else(|| { + SbroadError::FailedTo( + Action::Retrieve, + Some(Entity::Buckets), + "of the second intersect child from the bucket map." + .to_string(), + ) + })?; + let buckets = first_buckets.disjunct(second_buckets)?; + self.bucket_map.insert(*output, buckets); + } else { + return Err(SbroadError::UnexpectedNumberOfValues( + "current node should have exactly two children".to_string(), + )); + } + } Relational::Selection { children, filter, diff --git a/sbroad-core/src/executor/ir.rs b/sbroad-core/src/executor/ir.rs index db8ec751e9..7ffa4eca1f 100644 --- a/sbroad-core/src/executor/ir.rs +++ b/sbroad-core/src/executor/ir.rs @@ -276,6 +276,7 @@ impl ExecutionPlan { Relational::ScanSubQuery { .. } => self.get_subquery_child(*top_id), Relational::Except { .. } | Relational::GroupBy { .. } + | Relational::Intersect { .. } | Relational::Join { .. } | Relational::Projection { .. } | Relational::ScanRelation { .. } diff --git a/sbroad-core/src/executor/tests/exec_plan.rs b/sbroad-core/src/executor/tests/exec_plan.rs index 47efc30a86..e5c1186a8f 100644 --- a/sbroad-core/src/executor/tests/exec_plan.rs +++ b/sbroad-core/src/executor/tests/exec_plan.rs @@ -1,9 +1,10 @@ +use std::rc::Rc; + use itertools::Itertools; use pretty_assertions::assert_eq; use crate::backend::sql::tree::{OrderedSyntaxNodes, SyntaxPlan}; use crate::collection; -use crate::executor::engine::helpers::filter_vtable; use crate::executor::engine::mock::{ReplicasetDispatchInfo, RouterRuntimeMock, VshardMock}; use crate::ir::relation::Type; use crate::ir::tests::{column_integer_user_non_null, column_user_non_null}; @@ -870,7 +871,7 @@ fn global_union_all2() { let mut virtual_table = VirtualTable::new(); virtual_table.add_column(column_integer_user_non_null(String::from("e"))); virtual_table.add_tuple(vec![Value::Integer(1)]); - let mut exec_plan = query.get_mut_exec_plan(); + let exec_plan = query.get_mut_exec_plan(); exec_plan .set_motion_vtable(motion_id, virtual_table.clone(), &coordinator) .unwrap(); @@ -1086,3 +1087,70 @@ fn global_union_all4() { assert_eq!(expected, actual_dispatch); } + +#[test] +fn global_except() { + let sql = r#"select "a" from "global_t" + except select "e" from "t2""#; + let mut coordinator = RouterRuntimeMock::new(); + coordinator.set_vshard_mock(3); + + let mut query = Query::new(&coordinator, sql, vec![]).unwrap(); + + let intersect_motion_id = *query + .exec_plan + .get_ir_plan() + .clone_slices() + .slice(0) + .unwrap() + .position(0) + .unwrap(); + + { + // check map stage + let motion_child = query + .exec_plan + .get_motion_subtree_root(intersect_motion_id) + .unwrap(); + let buckets = query.bucket_discovery(motion_child).unwrap(); + let sql = get_sql_from_execution_plan( + &mut query.exec_plan, + motion_child, + Snapshot::Oldest, + &buckets, + "test", + ); + assert_eq!( + sql, + PatternWithParams::new( + r#"SELECT "t2"."e" FROM "t2" INTERSECT SELECT "global_t"."a" FROM "global_t""# + .to_string(), + vec![] + ) + ); + + let mut virtual_table = VirtualTable::new(); + virtual_table.add_column(column_integer_user_non_null(String::from("e"))); + virtual_table.add_tuple(vec![Value::Integer(1)]); + query + .get_mut_exec_plan() + .set_motion_vtable(intersect_motion_id, virtual_table.clone(), &coordinator) + .unwrap(); + } + + // check reduce stage + let res = *query + .dispatch() + .unwrap() + .downcast::<ProducerResult>() + .unwrap(); + let mut expected = ProducerResult::new(); + expected.rows.extend(vec![vec![ + LuaValue::String(format!("Execute query locally")), + LuaValue::String(String::from(PatternWithParams::new( + r#"SELECT "global_t"."a" FROM "global_t" EXCEPT SELECT "e" FROM "TMP_test_47""#.into(), + vec![], + ))), + ]]); + assert_eq!(expected, res,) +} diff --git a/sbroad-core/src/frontend/sql/ir.rs b/sbroad-core/src/frontend/sql/ir.rs index de44f181f9..f7e29aef10 100644 --- a/sbroad-core/src/frontend/sql/ir.rs +++ b/sbroad-core/src/frontend/sql/ir.rs @@ -8,6 +8,7 @@ use crate::frontend::sql::ast::{ParseNode, Type}; use crate::ir::expression::Expression; use crate::ir::helpers::RepeatableState; use crate::ir::operator::{Arithmetic, Bool, Relational, Unary}; +use crate::ir::transformation::redistribution::MotionOpcode; use crate::ir::tree::traversal::{PostOrder, EXPR_CAPACITY}; use crate::ir::value::double::Double; use crate::ir::value::Value; @@ -433,5 +434,347 @@ impl Plan { } } +/// Helper struct to clone plan's subtree. +/// Assumes that all parameters are bound. +pub struct SubtreeCloner { + old_new: AHashMap<usize, usize>, + nodes_with_backward_references: Vec<usize>, +} + +impl SubtreeCloner { + fn new(capacity: usize) -> Self { + SubtreeCloner { + old_new: AHashMap::with_capacity(capacity), + nodes_with_backward_references: Vec::new(), + } + } + + fn get_new_id(&self, old_id: usize) -> Result<usize, SbroadError> { + self.old_new + .get(&old_id) + .ok_or_else(|| { + SbroadError::Invalid( + Entity::Plan, + Some(format!("new node not found for old id: {old_id}")), + ) + }) + .copied() + } + + fn copy_list(&self, list: &[usize]) -> Result<Vec<usize>, SbroadError> { + let mut new_list = Vec::with_capacity(list.len()); + for id in list { + new_list.push(self.get_new_id(*id)?); + } + Ok(new_list) + } + + fn clone_expression(&mut self, expr: &Expression) -> Result<Expression, SbroadError> { + let mut copied = expr.clone(); + + // note: all struct fields are listed explicitly (instead of `..`), so that + // when a new field is added to a struct, this match must + // be updated, or compilation will fail. + match &mut copied { + Expression::Constant { value: _ } + | Expression::Reference { + parent: _, + targets: _, + position: _, + col_type: _, + } + | Expression::CountAsterisk => {} + Expression::Alias { + ref mut child, + name: _, + } + | Expression::Cast { + ref mut child, + to: _, + } + | Expression::Unary { + ref mut child, + op: _, + } => { + *child = self.get_new_id(*child)?; + } + Expression::Bool { + ref mut left, + ref mut right, + op: _, + } + | Expression::Arithmetic { + ref mut left, + ref mut right, + op: _, + with_parentheses: _, + } + | Expression::Concat { + ref mut left, + ref mut right, + } => { + *left = self.get_new_id(*left)?; + *right = self.get_new_id(*right)?; + } + Expression::Row { + list: ref mut children, + distribution: _, + } + | Expression::StableFunction { + ref mut children, + name: _, + is_distinct: _, + func_type: _, + } => { + *children = self.copy_list(&*children)?; + } + } + + Ok(copied) + } + + #[allow(clippy::too_many_lines)] + fn clone_relational( + &mut self, + old_relational: &Relational, + id: usize, + ) -> Result<Relational, SbroadError> { + let mut copied = old_relational.clone(); + + // all relational nodes have output and children list, + // which must be copied. + if let Some(children) = old_relational.children() { + let new_children = self.copy_list(children)?; + copied.set_children(new_children)?; + } + let new_output_id = self.get_new_id(old_relational.output())?; + *copied.mut_output() = new_output_id; + + // copy node specific fields, that reference other plan nodes + + // note: all struct fields are listed explicitly (instead of `..`), so that + // when a new field is added to a struct, this match must + // be updated, or compilation will fail. + match &mut copied { + Relational::Except { + children: _, + output: _, + } + | Relational::Intersect { + children: _, + output: _, + } + | Relational::UnionAll { + children: _, + output: _, + } + | Relational::Values { + output: _, + children: _, + } + | Relational::Projection { + children: _, + output: _, + is_distinct: _, + } + | Relational::Insert { + relation: _, + columns: _, + children: _, + output: _, + conflict_strategy: _, + } + | Relational::Update { + relation: _, + children: _, + update_columns_map: _, + strategy: _, + pk_positions: _, + output: _, + } + | Relational::Delete { + relation: _, + children: _, + output: _, + } + | Relational::ScanRelation { + alias: _, + output: _, + relation: _, + } + | Relational::ScanSubQuery { + alias: _, + children: _, + output: _, + } => {} + Relational::Having { + children: _, + output: _, + filter, + } + | Relational::Selection { + children: _, + filter, + output: _, + } + | Relational::Join { + children: _, + condition: filter, + output: _, + kind: _, + } => { + *filter = self.get_new_id(*filter)?; + } + Relational::Motion { + alias: _, + children: _, + policy: _, + program, + output: _, + is_child_subquery: _, + } => { + for op in &mut program.0 { + match op { + MotionOpcode::RearrangeForShardedUpdate { + update_id: _, + old_shard_columns_len: _, + new_shard_columns_positions: _, + } => { + // Update -> Motion -> ... + // Update is not copied yet. + self.nodes_with_backward_references.push(id); + } + MotionOpcode::AddMissingRowsForLeftJoin { motion_id } => { + // Projection -> THIS Motion -> Projection -> InnerJoin -> Motion (== motion_id) + // so it is safe to look up motion_id in map + *motion_id = self.get_new_id(*motion_id)?; + } + MotionOpcode::PrimaryKey(_) + | MotionOpcode::ReshardIfNeeded + | MotionOpcode::SerializeAsEmptyTable(_) => {} + } + } + } + Relational::GroupBy { + children: _, + gr_cols, + output: _, + is_final: _, + } => { + *gr_cols = self.copy_list(gr_cols)?; + } + Relational::ValuesRow { + output: _, + data, + children: _, + } => { + *data = self.get_new_id(*data)?; + } + } + + Ok(copied) + } + + // Some nodes contain references to nodes above in the tree + // This function replaces those references to new nodes. + fn replace_backward_refs(&self, plan: &mut Plan) -> Result<(), SbroadError> { + for old_id in &self.nodes_with_backward_references { + if let Node::Relational(Relational::Motion { program, .. }) = plan.get_node(*old_id)? { + let op_cnt = program.0.len(); + for idx in 0..op_cnt { + let op = plan.get_motion_opcode(*old_id, idx)?; + if let MotionOpcode::RearrangeForShardedUpdate { update_id, .. } = op { + let new_motion_id = self.get_new_id(*old_id)?; + let new_update_id = self.get_new_id(*update_id)?; + + if let Relational::Motion { + program: new_program, + .. + } = plan.get_mut_relation_node(new_motion_id)? + { + if let Some(MotionOpcode::RearrangeForShardedUpdate { + update_id: new_node_update_id, + .. + }) = new_program.0.get_mut(idx) + { + *new_node_update_id = new_update_id; + } + } + } + } + } + } + + Ok(()) + } + + fn clone( + &mut self, + plan: &mut Plan, + top_id: usize, + capacity: usize, + ) -> Result<usize, SbroadError> { + let mut dfs = PostOrder::with_capacity(|x| plan.subtree_iter(x, true), capacity); + dfs.populate_nodes(top_id); + let nodes = dfs.take_nodes(); + drop(dfs); + for (_, id) in nodes { + let node = plan.get_node(id)?; + let new_node = match node { + Node::Relational(rel) => Node::Relational(self.clone_relational(rel, id)?), + Node::Expression(expr) => Node::Expression(self.clone_expression(expr)?), + _ => { + return Err(SbroadError::Invalid( + Entity::Node, + Some(format!( + "clone: expected relational or expression on id: {id}" + )), + )) + } + }; + let new_id = plan.nodes.push(new_node); + let old = self.old_new.insert(id, new_id); + if let Some(old_new_id) = old { + return Err(SbroadError::Invalid( + Entity::Plan, + Some(format!( + "clone: node with id {id} was mapped twice: {old_new_id}, {new_id}" + )), + )); + } + } + + self.replace_backward_refs(plan)?; + + let new_top_id = self + .old_new + .get(&top_id) + .ok_or_else(|| { + SbroadError::Invalid( + Entity::Plan, + Some(format!("invalid subtree traversal with top: {top_id}")), + ) + }) + .copied()?; + Ok(new_top_id) + } + + /// Clones the given subtree to the plan arena and returns new `top_id`. + /// Assumes that all parameters are bound and there are no parameters + /// in the subtree. + /// + /// # Errors + /// - invalid plan subtree, e.g some node is met twice in the plan + /// - parameters/ddl/acl nodes are found in subtree + pub fn clone_subtree( + plan: &mut Plan, + top_id: usize, + subtree_capacity: usize, + ) -> Result<usize, SbroadError> { + let mut helper = Self::new(subtree_capacity); + helper.clone(plan, top_id, subtree_capacity) + } +} + #[cfg(test)] mod tests; diff --git a/sbroad-core/src/frontend/sql/ir/tests/global.rs b/sbroad-core/src/frontend/sql/ir/tests/global.rs index 5df84052d3..399d39adc0 100644 --- a/sbroad-core/src/frontend/sql/ir/tests/global.rs +++ b/sbroad-core/src/frontend/sql/ir/tests/global.rs @@ -6,6 +6,7 @@ use crate::ir::distribution::Distribution; use crate::ir::operator::Relational; use crate::ir::transformation::helpers::sql_to_optimized_ir; use crate::ir::tree::traversal::{FilterFn, PostOrderWithFilter, REL_CAPACITY}; +use crate::ir::value::Value; use crate::ir::{Node, Plan}; use pretty_assertions::assert_eq; @@ -19,11 +20,6 @@ fn front_sql_check_global_tbl_support() { let metadata = &RouterConfigurationMock::new(); - check_error( - r#"select "a" from "global_t" except select * from (select "a" as "oa" from "t3")"#, - metadata, - global_tbl_err!("Except"), - ); check_error( r#"insert into "global_t" values (1, 1)"#, metadata, @@ -1138,3 +1134,244 @@ vtable_max_rows = 5000 check_union_dist(&plan, &[DistMock::Global]); } + +#[test] +fn check_plan_except_global_vs_segment() { + let input = r#" + select "a", "b" from "global_t" + where "a" = ? + except + select "e", "f" from "t2" + "#; + + let plan = sql_to_optimized_ir(input, vec![Value::Unsigned(1)]); + + // TODO: the subtree for left except child is reused + // from another motion, show this in explain + let expected_explain = String::from( + r#"except + projection ("global_t"."a"::integer -> "a", "global_t"."b"::integer -> "b") + selection ROW("global_t"."a"::integer) = ROW(1::unsigned) + scan "global_t" + motion [policy: full] + intersect + projection ("t2"."e"::unsigned -> "e", "t2"."f"::unsigned -> "f") + scan "t2" + projection ("global_t"."a"::integer -> "a", "global_t"."b"::integer -> "b") + selection ROW("global_t"."a"::integer) = ROW(1::unsigned) + scan "global_t" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + +#[test] +fn check_plan_except_global_vs_any() { + let input = r#" + select "a" from "global_t" + except + select "e" from "t2" + "#; + + let plan = sql_to_optimized_ir(input, vec![]); + + // TODO: the subtree for left except child is reused + // from another motion, show this in explain + let expected_explain = String::from( + r#"except + projection ("global_t"."a"::integer -> "a") + scan "global_t" + motion [policy: full] + intersect + projection ("t2"."e"::unsigned -> "e") + scan "t2" + projection ("global_t"."a"::integer -> "a") + scan "global_t" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + +#[test] +fn check_plan_except_global_vs_global() { + let input = r#" + select "a" from "global_t" + except + select "b" from "global_t" + "#; + + let plan = sql_to_optimized_ir(input, vec![]); + + let expected_explain = String::from( + r#"except + projection ("global_t"."a"::integer -> "a") + scan "global_t" + projection ("global_t"."b"::integer -> "b") + scan "global_t" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + +#[test] +fn check_plan_except_global_vs_single() { + let input = r#" + select "a" from "global_t" + except + select sum("e") from "t2" + "#; + + let plan = sql_to_optimized_ir(input, vec![]); + + let expected_explain = String::from( + r#"except + projection ("global_t"."a"::integer -> "a") + scan "global_t" + projection (sum(("sum_23"::decimal))::decimal -> "COL_1") + motion [policy: full] + scan + projection (sum(("t2"."e"::unsigned))::decimal -> "sum_23") + scan "t2" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + +#[test] +fn check_plan_except_single_vs_global() { + let input = r#" + select sum("e") from "t2" + except + select "a" from "global_t" + "#; + + let plan = sql_to_optimized_ir(input, vec![]); + + let expected_explain = String::from( + r#"except + projection (sum(("sum_13"::decimal))::decimal -> "COL_1") + motion [policy: full] + scan + projection (sum(("t2"."e"::unsigned))::decimal -> "sum_13") + scan "t2" + projection ("global_t"."a"::integer -> "a") + scan "global_t" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + +#[test] +fn check_plan_except_segment_vs_global() { + let input = r#" + select "e", "f" from "t2" + except + select "a", "b" from "global_t" + "#; + + let plan = sql_to_optimized_ir(input, vec![]); + + let expected_explain = String::from( + r#"except + projection ("t2"."e"::unsigned -> "e", "t2"."f"::unsigned -> "f") + scan "t2" + projection ("global_t"."a"::integer -> "a", "global_t"."b"::integer -> "b") + scan "global_t" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + +#[test] +fn check_plan_except_any_vs_global() { + let input = r#" + select "e" from "t2" + except + select "b" from "global_t" + "#; + + let plan = sql_to_optimized_ir(input, vec![]); + + let expected_explain = String::from( + r#"except + projection ("t2"."e"::unsigned -> "e") + scan "t2" + projection ("global_t"."b"::integer -> "b") + scan "global_t" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + +#[test] +fn check_plan_except_non_trivial_global_subtree_vs_any() { + // check that plan is correctly built when left global + // subtree is something more difficult than a scan of + // a global table + let input = r#" + select "b" from "global_t" + left join (select "b" as b from "global_t") + on "a" = b + where "a" = 1 + except + select "e" from "t2" + "#; + + let plan = sql_to_optimized_ir(input, vec![]); + + let expected_explain = String::from( + r#"except + projection ("global_t"."b"::integer -> "b") + selection ROW("global_t"."a"::integer) = ROW(1::unsigned) + left join on ROW("global_t"."a"::integer) = ROW("B"::integer) + scan "global_t" + projection ("global_t"."a"::integer -> "a", "global_t"."b"::integer -> "b") + scan "global_t" + scan + projection ("global_t"."b"::integer -> "B") + scan "global_t" + motion [policy: full] + intersect + projection ("t2"."e"::unsigned -> "e") + scan "t2" + projection ("global_t"."b"::integer -> "b") + selection ROW("global_t"."a"::integer) = ROW(1::unsigned) + left join on ROW("global_t"."a"::integer) = ROW("B"::integer) + scan "global_t" + projection ("global_t"."a"::integer -> "a", "global_t"."b"::integer -> "b") + scan "global_t" + scan + projection ("global_t"."b"::integer -> "B") + scan "global_t" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} diff --git a/sbroad-core/src/ir.rs b/sbroad-core/src/ir.rs index c42ae0533e..f3102e681d 100644 --- a/sbroad-core/src/ir.rs +++ b/sbroad-core/src/ir.rs @@ -1155,7 +1155,8 @@ impl Plan { /// # Errors /// - serialization error (to binary) pub fn pattern_id(&self, top_id: usize) -> Result<String, SbroadError> { - let mut dfs = PostOrder::with_capacity(|x| self.subtree_iter(x), self.nodes.next_id()); + let mut dfs = + PostOrder::with_capacity(|x| self.subtree_iter(x, false), self.nodes.next_id()); dfs.populate_nodes(top_id); let nodes = dfs.take_nodes(); let mut plan_nodes: Vec<&Node> = Vec::with_capacity(nodes.len()); diff --git a/sbroad-core/src/ir/api/parameter.rs b/sbroad-core/src/ir/api/parameter.rs index b3eabab9d8..4d56bec13a 100644 --- a/sbroad-core/src/ir/api/parameter.rs +++ b/sbroad-core/src/ir/api/parameter.rs @@ -50,7 +50,7 @@ impl Plan { } let capacity = self.next_id(); - let mut tree = PostOrder::with_capacity(|node| self.subtree_iter(node), capacity); + let mut tree = PostOrder::with_capacity(|node| self.subtree_iter(node, false), capacity); let top_id = self.get_top()?; tree.populate_nodes(top_id); let nodes = tree.take_nodes(); diff --git a/sbroad-core/src/ir/explain.rs b/sbroad-core/src/ir/explain.rs index f2363ddc4b..3e452f9f5b 100644 --- a/sbroad-core/src/ir/explain.rs +++ b/sbroad-core/src/ir/explain.rs @@ -744,6 +744,7 @@ impl Display for InnerJoin { enum ExplainNode { Delete(String), Except, + Intersect, GroupBy(GroupBy), InnerJoin(InnerJoin), ValueRow(ColExpr), @@ -774,6 +775,7 @@ impl Display for ExplainNode { ExplainNode::Selection(s) => format!("selection {s}"), ExplainNode::Having(s) => format!("having {s}"), ExplainNode::UnionAll => "union all".to_string(), + ExplainNode::Intersect => "intersect".to_string(), ExplainNode::Update(u) => u.to_string(), ExplainNode::SubQuery(s) => s.to_string(), ExplainNode::Motion(m) => m.to_string(), @@ -880,6 +882,17 @@ impl FullExplain { let mut current_node = ExplainTreePart::with_level(level); let node = ir.get_relation_node(id)?; current_node.current = match &node { + Relational::Intersect { .. } => { + if let (Some(right), Some(left)) = (stack.pop(), stack.pop()) { + current_node.children.push(left); + current_node.children.push(right); + } else { + return Err(SbroadError::UnexpectedNumberOfValues( + "Intersect node must have exactly two children".into(), + )); + } + Some(ExplainNode::Intersect) + } Relational::Except { .. } => { if let (Some(right), Some(left)) = (stack.pop(), stack.pop()) { current_node.children.push(left); diff --git a/sbroad-core/src/ir/helpers.rs b/sbroad-core/src/ir/helpers.rs index 40ebc4b8db..55b1f2b6d0 100644 --- a/sbroad-core/src/ir/helpers.rs +++ b/sbroad-core/src/ir/helpers.rs @@ -254,6 +254,7 @@ impl Plan { } Relational::Delete { .. } => writeln!(buf, "Delete")?, Relational::Insert { .. } => writeln!(buf, "Insert")?, + Relational::Intersect { .. } => writeln!(buf, "Intersect")?, Relational::Except { .. } => writeln!(buf, "Except")?, } // Print children. @@ -263,6 +264,7 @@ impl Plan { | Relational::Except { children, .. } | Relational::Delete { children, .. } | Relational::Insert { children, .. } + | Relational::Intersect { children, .. } | Relational::ScanSubQuery { children, .. } | Relational::Selection { children, .. } | Relational::Values { children, .. } @@ -291,6 +293,7 @@ impl Plan { | Relational::Except { output, .. } | Relational::Delete { output, .. } | Relational::Insert { output, .. } + | Relational::Intersect { output, .. } | Relational::Projection { output, .. } | Relational::ScanSubQuery { output, .. } | Relational::GroupBy { output, .. } diff --git a/sbroad-core/src/ir/operator.rs b/sbroad-core/src/ir/operator.rs index a7fdc677d2..fb35e2740b 100644 --- a/sbroad-core/src/ir/operator.rs +++ b/sbroad-core/src/ir/operator.rs @@ -311,6 +311,12 @@ pub enum Relational { /// What to do in case there is a conflict during insert on storage conflict_strategy: ConflictStrategy, }, + Intersect { + // contains exactly 2 children + children: Vec<usize>, + // id of the output tuple + output: usize, + }, Update { /// Relation name. relation: String, @@ -495,6 +501,7 @@ impl Relational { | Relational::Join { output, .. } | Relational::Delete { output, .. } | Relational::Insert { output, .. } + | Relational::Intersect { output, .. } | Relational::Motion { output, .. } | Relational::Projection { output, .. } | Relational::ScanRelation { output, .. } @@ -517,6 +524,7 @@ impl Relational { | Relational::Join { output, .. } | Relational::Delete { output, .. } | Relational::Insert { output, .. } + | Relational::Intersect { output, .. } | Relational::Motion { output, .. } | Relational::Projection { output, .. } | Relational::ScanRelation { output, .. } @@ -539,6 +547,7 @@ impl Relational { | Relational::Having { children, .. } | Relational::Delete { children, .. } | Relational::Insert { children, .. } + | Relational::Intersect { children, .. } | Relational::Motion { children, .. } | Relational::Projection { children, .. } | Relational::ScanSubQuery { children, .. } @@ -575,6 +584,9 @@ impl Relational { | Relational::Insert { ref mut children, .. } + | Relational::Intersect { + ref mut children, .. + } | Relational::Motion { ref mut children, .. } @@ -658,6 +670,10 @@ impl Relational { children: ref mut old, .. } + | Relational::Intersect { + children: ref mut old, + .. + } | Relational::Motion { children: ref mut old, .. @@ -722,6 +738,7 @@ impl Relational { } => Ok(alias.as_deref().or(Some(relation.as_str()))), Relational::Projection { .. } | Relational::GroupBy { .. } + | Relational::Intersect { .. } | Relational::Having { .. } | Relational::Selection { .. } | Relational::Update { .. } @@ -776,6 +793,7 @@ impl Relational { Relational::Except { .. } => "Except", Relational::Delete { .. } => "Delete", Relational::Insert { .. } => "Insert", + Relational::Intersect { .. } => "Intersect", Relational::Update { .. } => "Update", Relational::Join { .. } => "Join", Relational::Motion { .. } => "Motion", diff --git a/sbroad-core/src/ir/transformation/redistribution.rs b/sbroad-core/src/ir/transformation/redistribution.rs index 7ed29cadb4..0b2fa50b69 100644 --- a/sbroad-core/src/ir/transformation/redistribution.rs +++ b/sbroad-core/src/ir/transformation/redistribution.rs @@ -6,6 +6,7 @@ use std::cmp::Ordering; use std::collections::{hash_map::Entry, HashMap, HashSet}; use crate::errors::{Action, Entity, SbroadError}; +use crate::frontend::sql::ir::SubtreeCloner; use crate::ir::distribution::{Distribution, Key, KeySet}; use crate::ir::expression::Expression; use crate::ir::operator::{Bool, JoinKind, Relational, Unary, UpdateStrategy}; @@ -1017,29 +1018,15 @@ impl Plan { )); } } - Relational::Except { .. } => { - let left_dist = self.get_distribution( - self.get_relational_output(self.get_relational_child(rel_id, 0)?)?, - )?; - let right_dist = self.get_distribution( - self.get_relational_output(self.get_relational_child(rel_id, 1)?)?, - )?; - if matches!( - (left_dist, right_dist), - (_, Distribution::Global) | (Distribution::Global, _) - ) { - return Err(SbroadError::UnsupportedOpForGlobalTables( - node.name().to_string(), - )); - } - } - Relational::Projection { .. } + Relational::Except { .. } + | Relational::Projection { .. } | Relational::GroupBy { .. } | Relational::Having { .. } | Relational::Join { .. } | Relational::ScanRelation { .. } | Relational::Selection { .. } | Relational::ValuesRow { .. } + | Relational::Intersect { .. } | Relational::ScanSubQuery { .. } | Relational::Motion { .. } | Relational::UnionAll { .. } => {} @@ -1763,117 +1750,155 @@ impl Plan { #[allow(clippy::too_many_lines)] fn resolve_except_conflicts(&mut self, rel_id: usize) -> Result<Strategy, SbroadError> { + if !matches!(self.get_relation_node(rel_id)?, Relational::Except { .. }) { + return Err(SbroadError::Invalid( + Entity::Relational, + Some("expected Except node".into()), + )); + } + let mut map = Strategy::new(rel_id); - match self.get_relation_node(rel_id)? { - Relational::Except { children, .. } => { - if let (Some(left), Some(right), None) = - (children.first(), children.get(1), children.get(2)) - { - let left_output_id = self.get_relation_node(*left)?.output(); - let right_output_id = self.get_relation_node(*right)?.output(); - let left_output_row = - self.get_expression_node(left_output_id)?.get_row_list()?; - let right_output_row = - self.get_expression_node(right_output_id)?.get_row_list()?; - if left_output_row.len() != right_output_row.len() { - return Err(SbroadError::UnexpectedNumberOfValues(format!( - "Except node children have different row lengths: left {}, right {}", - left_output_row.len(), - right_output_row.len() - ))); - } - let left_dist = self.get_distribution(left_output_id)?; - let right_dist = self.get_distribution(right_output_id)?; - match left_dist { - Distribution::Segment { - keys: left_keys, .. - } => { - if let Distribution::Segment { - keys: right_keys, .. - } = right_dist - { - // Distribution key sets have common keys, no need for the data motion. - if right_keys.intersection(left_keys).iter().next().is_some() { - return Ok(map); - } - } - let key = left_keys.iter().next().ok_or_else(|| SbroadError::Invalid( + + if self.resolve_except_global_vs_sharded(rel_id)? { + return Ok(map); + } + + let left_id = self.get_relational_child(rel_id, 0)?; + let right_id = self.get_relational_child(rel_id, 1)?; + let left_dist = self.get_rel_distribution(left_id)?; + let right_dist = self.get_rel_distribution(right_id)?; + + let (left_motion, right_motion) = match (left_dist, right_dist) { + ( + Distribution::Segment { keys: left_keys }, + Distribution::Segment { keys: right_keys }, + ) => { + if right_keys.intersection(left_keys).iter().next().is_some() { + // Distribution key sets have common keys, no need for the data motion. + (MotionPolicy::None, MotionPolicy::None) + } else { + let key = left_keys.iter().next().ok_or_else(|| SbroadError::Invalid( + Entity::Distribution, + Some("left child's segment distribution is invalid: no keys found in the set".into()), + ))?; + (MotionPolicy::None, MotionPolicy::Segment(key.into())) + } + } + ( + Distribution::Segment { .. } + | Distribution::Any + | Distribution::Global + | Distribution::Single, + Distribution::Global, + ) => (MotionPolicy::None, MotionPolicy::None), + (Distribution::Segment { keys }, _) => { + let key = keys.iter().next().ok_or_else(|| SbroadError::Invalid( Entity::Distribution, Some("left child's segment distribution is invalid: no keys found in the set".into()), ))?; - map.add_child( - *right, - MotionPolicy::Segment(key.into()), - Program::default(), - ); - } - Distribution::Single => { - match right_dist { - Distribution::Segment { keys: right_keys } => { - map.add_child( - *left, - MotionPolicy::Segment(MotionKey::from( - right_keys.iter().next().ok_or_else(|| { - SbroadError::Invalid( - Entity::Distribution, - Some(format!( - "{} {} {right}", - "Segment distribution with no keys.", - "Except right child:" - )), - ) - })?, - )), - Program::default(), - ); - map.add_child(*right, MotionPolicy::None, Program::default()); - } - Distribution::Single => { - // we could redistribute both children by any combination of columns, - // first column is used for simplicity - map.add_child( - *left, - MotionPolicy::Segment(MotionKey { - targets: vec![Target::Reference(0)], - }), - Program::default(), - ); - map.add_child( - *right, - MotionPolicy::Segment(MotionKey { - targets: vec![Target::Reference(0)], - }), - Program::default(), - ); - } - _ => { - // right child must to be broadcasted to each node - map.add_child( - *left, - MotionPolicy::Segment(MotionKey { - targets: vec![Target::Reference(0)], // any combination of columns would suffice - }), - Program::default(), - ); - map.add_child(*right, MotionPolicy::Full, Program::default()); - } - } - } - _ => { - map.add_child(*right, MotionPolicy::Full, Program::default()); - } - } - return Ok(map); - } - Err(SbroadError::UnexpectedNumberOfValues( - "Except node doesn't have exactly two children.".into(), - )) + (MotionPolicy::None, MotionPolicy::Segment(key.into())) } - _ => Err(SbroadError::Invalid( - Entity::Relational, - Some("expected Except node".into()), - )), + (Distribution::Single, Distribution::Single) => { + // we could redistribute both children by any combination of columns, + // first column is used for simplicity + let policy = MotionPolicy::Segment(MotionKey { + targets: vec![Target::Reference(0)], + }); + (policy.clone(), policy) + } + (Distribution::Single, Distribution::Segment { keys }) => { + let key = keys.iter().next().ok_or_else(|| { + SbroadError::Invalid( + Entity::Distribution, + Some(format!( + "{} {} {right_id}", + "Segment distribution with no keys.", "Except right child:" + )), + ) + })?; + (MotionPolicy::Segment(key.into()), MotionPolicy::None) + } + (Distribution::Global, Distribution::Single) => { + (MotionPolicy::None, MotionPolicy::None) + } + (_, _) => (MotionPolicy::None, MotionPolicy::Full), + }; + + map.add_child(left_id, left_motion, Program::default()); + map.add_child(right_id, right_motion, Program::default()); + + Ok(map) + } + + /// Resolves the case when left child has distribution Global + /// and right child has Any or Segment distribution. + /// If distributions are different returns `false`, otherwise modifies the + /// plan inserting motions (and other nodes) and returns `true`. + /// + /// Currently, the except is executed in two stages: + /// 1. Map stage: do intersect of the global child and sharded child + /// 2. Reduce stage: do except with global child and results from Map stage + /// + /// For example: + /// ```sql + /// select a from g + /// except + /// select b from segment_a + /// ``` + /// + /// Before transformation: + /// ```text + /// Except + /// Projection a + /// scan g + /// Projection b + /// scan segment_a + /// ``` + /// + /// Transforms into: + /// + /// ```text + /// Except + /// Projection a + /// scan g + /// Motion(Full) + /// Intersect + /// Projection b + /// scan segment_a + /// Projection a + /// scan g + /// ``` + fn resolve_except_global_vs_sharded(&mut self, except_id: usize) -> Result<bool, SbroadError> { + let left_id = self.get_relational_child(except_id, 0)?; + let right_id = self.get_relational_child(except_id, 1)?; + let left_dist = self.get_rel_distribution(left_id)?; + let right_dist = self.get_rel_distribution(right_id)?; + if !matches!( + (left_dist, right_dist), + ( + Distribution::Global, + Distribution::Any | Distribution::Segment { .. } + ) + ) { + return Ok(false); } + + let cloned_left_id = SubtreeCloner::clone_subtree(self, left_id, left_id)?; + let right_output_id = self.get_relational_output(right_id)?; + let intersect_output_id = self.clone_expr_subtree(right_output_id)?; + let intersect = Relational::Intersect { + children: vec![right_id, cloned_left_id], + output: intersect_output_id, + }; + let intersect_id = self.nodes.push(Node::Relational(intersect)); + + self.change_child(except_id, right_id, intersect_id)?; + + let mut map = Strategy::new(except_id); + map.add_child(intersect_id, MotionPolicy::Full, Program::default()); + self.create_motion_nodes(map)?; + + Ok(true) } fn resolve_union_conflicts(&mut self, rel_id: usize) -> Result<Strategy, SbroadError> { @@ -1980,6 +2005,7 @@ impl Plan { Relational::ScanRelation { output, .. } | Relational::ScanSubQuery { output, .. } | Relational::GroupBy { output, .. } + | Relational::Intersect { output, .. } | Relational::Having { output, .. } | Relational::ValuesRow { output, .. } => { self.set_distribution(output)?; diff --git a/sbroad-core/src/ir/tree/relation.rs b/sbroad-core/src/ir/tree/relation.rs index a87b32d271..c5366fb6ac 100644 --- a/sbroad-core/src/ir/tree/relation.rs +++ b/sbroad-core/src/ir/tree/relation.rs @@ -68,6 +68,7 @@ fn relational_next<'nodes>( Relational::Except { children, .. } | Relational::Join { children, .. } | Relational::Insert { children, .. } + | Relational::Intersect { children, .. } | Relational::Delete { children, .. } | Relational::Motion { children, .. } | Relational::Projection { children, .. } diff --git a/sbroad-core/src/ir/tree/subtree.rs b/sbroad-core/src/ir/tree/subtree.rs index ad2a43002c..c186097956 100644 --- a/sbroad-core/src/ir/tree/subtree.rs +++ b/sbroad-core/src/ir/tree/subtree.rs @@ -18,6 +18,7 @@ pub struct SubtreeIterator<'plan> { current: usize, child: RefCell<usize>, plan: &'plan Plan, + need_output: bool, } impl<'nodes> TreeIterator<'nodes> for SubtreeIterator<'nodes> { @@ -42,7 +43,7 @@ impl<'plan> PlanTreeIterator<'plan> for SubtreeIterator<'plan> { impl<'plan> SubtreePlanIterator<'plan> for SubtreeIterator<'plan> { fn need_output(&self) -> bool { - false + self.need_output } fn need_motion_subtree(&self) -> bool { @@ -60,11 +61,12 @@ impl<'plan> Iterator for SubtreeIterator<'plan> { impl<'plan> Plan { #[must_use] - pub fn subtree_iter(&'plan self, current: usize) -> SubtreeIterator<'plan> { + pub fn subtree_iter(&'plan self, current: usize, need_output: bool) -> SubtreeIterator<'plan> { SubtreeIterator { current, child: RefCell::new(0), plan: self, + need_output, } } } @@ -317,6 +319,9 @@ fn subtree_next<'plan>( | Relational::Insert { children, output, .. } + | Relational::Intersect { + children, output, .. + } | Relational::Delete { children, output, .. } diff --git a/sbroad-core/src/ir/tree/tests.rs b/sbroad-core/src/ir/tree/tests.rs index 85ae443a13..5252a26da0 100644 --- a/sbroad-core/src/ir/tree/tests.rs +++ b/sbroad-core/src/ir/tree/tests.rs @@ -240,7 +240,8 @@ fn subtree_dfs_post() { }; // Traverse relational nodes in the plan tree - let mut dft_post = PostOrder::with_capacity(|node| plan.subtree_iter(node), plan.next_id()); + let mut dft_post = + PostOrder::with_capacity(|node| plan.subtree_iter(node, false), plan.next_id()); let mut iter = dft_post.iter(top); assert_eq!(iter.next(), Some((3, *c_ref_id))); assert_eq!(iter.next(), Some((2, *alias_id))); -- GitLab