diff --git a/doc/design/global_tables_rfc.md b/doc/design/global_tables_rfc.md index a8bddb919ad0f0e7c285be92f39aff6f3bb6e85e..050c2aa2ada053dc9c420f88aa26e4998d960882 100644 --- a/doc/design/global_tables_rfc.md +++ b/doc/design/global_tables_rfc.md @@ -588,6 +588,47 @@ flowchart LR Union --> |Any| Parent(parent) ``` +#### Arbitrary subtree with global distribution +The above idea with primary key filter does not work, if the subtree with global distribution +is more complex than just a scan of a global table. + +For example: + +```sql +(select g1.a, g2.b from +global_1 as g1 +inner join +global_2 as g2) +union all +select c, d from segment +``` +Here we have a join of two global tables and such subtree will have `Global` distribution. + +It is also clear that we can't push down primary key filter on global tables scans, because +there is an inner join above them. + +I suggest we simply do global table materialization only on single storage +and on all other storages do not scan the global table at all. + +Suppose we have three storages where the whole subtree with union all must be executed. +Then: + +storage 1 and 2: +``` +select c, d from segment +``` + +storage 3: +``` +(select g1.a, g2.b from +global_1 as g1 +inner join +global_2 as g2) +union all +select c, d from segment +``` + + ## Except diff --git a/sbroad-cartridge/src/cartridge/router.rs b/sbroad-cartridge/src/cartridge/router.rs index ed40c095817ffde5ebe81f4d0ecf62357c002ddd..5600d58f32d90b84f0acdd7f51aca2fcf9aa5336 100644 --- a/sbroad-cartridge/src/cartridge/router.rs +++ b/sbroad-cartridge/src/cartridge/router.rs @@ -24,7 +24,7 @@ use sbroad::errors::{Action, Entity, SbroadError}; use sbroad::executor::bucket::Buckets; use sbroad::executor::engine::{ helpers::{ - dispatch, explain_format, materialize_motion, normalize_name_from_schema, + dispatch_impl, explain_format, materialize_motion, normalize_name_from_schema, sharding_key_from_map, sharding_key_from_tuple, }, Router, Statistics, @@ -234,7 +234,7 @@ impl Router for RouterRuntime { top_id: usize, buckets: &Buckets, ) -> Result<Box<dyn Any>, SbroadError> { - dispatch(self, plan, top_id, buckets) + dispatch_impl(self, plan, top_id, buckets) } fn explain_format(&self, explain: String) -> Result<Box<dyn Any>, SbroadError> { diff --git a/sbroad-core/src/backend/sql/ir.rs b/sbroad-core/src/backend/sql/ir.rs index 2cc29598598b6ef7c876d1dd2783cc684aa0516c..7719b799c2f2ca9bd3e975e75795be831804fb62 100644 --- a/sbroad-core/src/backend/sql/ir.rs +++ b/sbroad-core/src/backend/sql/ir.rs @@ -281,6 +281,7 @@ impl ExecutionPlan { SyntaxData::Comma => sql.push(','), SyntaxData::Condition => sql.push_str("ON"), SyntaxData::Distinct => sql.push_str("DISTINCT"), + SyntaxData::Inline(content) => sql.push_str(content), SyntaxData::From => sql.push_str("FROM"), SyntaxData::Operator(s) => sql.push_str(s.as_str()), SyntaxData::OpenParenthesis => sql.push('('), diff --git a/sbroad-core/src/backend/sql/tree.rs b/sbroad-core/src/backend/sql/tree.rs index 66fe5e815959eac83758461b5e3ba90b399ec9b8..8c32f9caa63d4c0d49138be8cfef27baae1c2616 100644 --- a/sbroad-core/src/backend/sql/tree.rs +++ b/sbroad-core/src/backend/sql/tree.rs @@ -8,7 +8,7 @@ use crate::errors::{Action, Entity, SbroadError}; use crate::executor::ir::ExecutionPlan; use crate::ir::expression::Expression; use crate::ir::operator::{Bool, Relational, Unary}; -use crate::ir::transformation::redistribution::MotionPolicy; +use crate::ir::transformation::redistribution::{MotionOpcode, MotionPolicy}; use crate::ir::tree::traversal::PostOrder; use crate::ir::tree::Snapshot; use crate::ir::Node; @@ -32,6 +32,8 @@ pub enum SyntaxData { Condition, /// "distinct" Distinct, + /// Inline sql string + Inline(String), /// "from" From, /// "(" @@ -131,6 +133,14 @@ impl SyntaxNode { } } + fn new_inline(value: &str) -> Self { + SyntaxNode { + data: SyntaxData::Inline(value.into()), + left: None, + right: Vec::new(), + } + } + fn new_from() -> Self { SyntaxNode { data: SyntaxData::From, @@ -809,6 +819,8 @@ impl<'p> SyntaxPlan<'p> { policy, children, is_child_subquery, + program, + output, .. } => { if let MotionPolicy::LocalSegment { .. } = policy { @@ -837,6 +849,38 @@ impl<'p> SyntaxPlan<'p> { )); } } + if let Some(op) = program + .0 + .iter() + .find(|op| matches!(op, MotionOpcode::SerializeAsEmptyTable(_))) + { + let is_enabled = matches!(op, MotionOpcode::SerializeAsEmptyTable(true)); + if is_enabled { + let output_len = self.plan.get_ir_plan().get_row_list(*output)?.len(); + let empty_select = format!( + "select {}null where false", + "null, ".repeat(output_len - 1) + ); + let inline_id = self + .nodes + .push_syntax_node(SyntaxNode::new_inline(&empty_select)); + let pointer_id = self.nodes.push_syntax_node(SyntaxNode::new_pointer( + id, + None, + vec![inline_id], + )); + return Ok(pointer_id); + } + let child_plan_id = self.plan.get_ir_plan().get_relational_child(id, 0)?; + let child_sp_id = *self.nodes.map.get(&child_plan_id).ok_or_else(|| { + SbroadError::Invalid( + Entity::SyntaxPlan, + Some(format!("motion child {child_plan_id} is not found in map")), + ) + })?; + self.nodes.map.insert(id, child_sp_id); + return Ok(child_sp_id); + } let vtable = self.plan.get_motion_vtable(id)?; let vtable_alias = vtable.get_alias().map(String::from); diff --git a/sbroad-core/src/executor/engine/helpers.rs b/sbroad-core/src/executor/engine/helpers.rs index 2553ccd557f92fabebb1bcc42eab97562ca5b6d1..0206c889ac6c422e24a3a1a25cd3160ed2971004 100644 --- a/sbroad-core/src/executor/engine/helpers.rs +++ b/sbroad-core/src/executor/engine/helpers.rs @@ -607,7 +607,7 @@ pub fn explain_format(explain: &str) -> Result<Box<dyn Any>, SbroadError> { /// /// # Errors /// - Internal errors during the execution. -pub fn dispatch( +pub fn dispatch_impl( runtime: &impl Vshard, plan: &mut ExecutionPlan, top_id: usize, @@ -618,13 +618,35 @@ pub fn dispatch( &format!("dispatching plan: {plan:?}") ); let sub_plan = plan.take_subtree(top_id)?; + debug!(Option::from("dispatch"), &format!("sub plan: {sub_plan:?}")); + dispatch_by_buckets(sub_plan, buckets, runtime) +} + +/// Helper function that chooses one of the methods for execution +/// based on buckets. +/// +/// # Errors +/// - Failed to dispatch +pub fn dispatch_by_buckets( + sub_plan: ExecutionPlan, + buckets: &Buckets, + runtime: &impl Vshard, +) -> Result<Box<dyn Any>, SbroadError> { let query_type = sub_plan.query_type()?; let conn_type = sub_plan.connection_type()?; - debug!(Option::from("dispatch"), &format!("sub plan: {sub_plan:?}")); match buckets { Buckets::Filtered(_) => runtime.exec_ir_on_some(sub_plan, buckets), Buckets::Any => { + if sub_plan.has_customization_opcodes() { + return Err(SbroadError::Invalid( + Entity::SubTree, + Some( + "plan customization is needed only when executing on multiple replicasets" + .into(), + ), + )); + } // Check that all vtables don't have index. Because if they do, // they will be filtered later by filter_vtable if let Some(vtables) = &sub_plan.vtables { @@ -640,7 +662,7 @@ pub fn dispatch( runtime.exec_ir_on_any_node(sub_plan) } Buckets::All => { - if sub_plan.has_segmented_tables() { + if sub_plan.has_segmented_tables() || sub_plan.has_customization_opcodes() { let bucket_set: HashSet<u64, RepeatableState> = (1..=runtime.bucket_count()).collect(); let all_buckets = Buckets::new_filtered(bucket_set); diff --git a/sbroad-core/src/executor/engine/helpers/vshard.rs b/sbroad-core/src/executor/engine/helpers/vshard.rs index a930c0b4710933cef994b004f37438a6a241a98a..87acb110d9bccfaf8d12e9a6d89b4a3d29547c9e 100644 --- a/sbroad-core/src/executor/engine/helpers/vshard.rs +++ b/sbroad-core/src/executor/engine/helpers/vshard.rs @@ -3,7 +3,20 @@ use std::{ collections::{HashMap, HashSet}, }; -use crate::{executor::engine::Router, ir::helpers::RepeatableState, otm::child_span}; +use crate::{ + executor::engine::Router, + ir::{ + helpers::RepeatableState, + operator::Relational, + transformation::redistribution::{MotionOpcode, MotionPolicy}, + tree::{ + relation::RelationalIterator, + traversal::{PostOrderWithFilter, REL_CAPACITY}, + }, + Node, Plan, + }, + otm::child_span, +}; use rand::{thread_rng, Rng}; use sbroad_proc::otm_child_span; use tarantool::{tlua::LuaFunction, tuple::Tuple}; @@ -171,7 +184,7 @@ pub fn exec_ir_on_all_buckets( #[otm_child_span("query.dispatch.cartridge.some")] pub fn exec_ir_on_some_buckets( runtime: &(impl Router + Vshard), - mut sub_plan: ExecutionPlan, + sub_plan: ExecutionPlan, buckets: &Buckets, ) -> Result<Box<dyn Any>, SbroadError> { let query_type = sub_plan.query_type()?; @@ -194,41 +207,208 @@ pub fn exec_ir_on_some_buckets( } } - let mut rs_ir: ReplicasetMessage = HashMap::new(); + // todo(ars): group should be a runtime function not global, + // this way we could implement it for mock runtime for better testing let rs_bucket_vec: Vec<(String, Vec<u64>)> = group(buckets)?.drain().collect(); if rs_bucket_vec.is_empty() { return Err(SbroadError::UnexpectedNumberOfValues(format!( "no replica sets were found for the buckets {buckets:?} to execute the query on" ))); } + let rs_ir = prepare_rs_to_ir_map(&rs_bucket_vec, sub_plan)?; + let mut rs_message = HashMap::with_capacity(rs_ir.len()); + for (rs, ir) in rs_ir { + rs_message.insert(rs, Message::from(encode_plan(ir)?)); + } + match &query_type { + QueryType::DQL => dql_on_some( + &*runtime.metadata()?, + rs_message, + conn_type.is_readonly(), + vtable_max_rows, + ), + QueryType::DML => dml_on_some(&*runtime.metadata()?, rs_message, conn_type.is_readonly()), + } +} + +// Helper struct to hold information +// needed to apply SerializeAsEmpty opcode +// to subtree. +struct SerializeAsEmptyInfo { + // ids of topmost motion nodes which have this opcode + // with `true` value + top_motion_ids: Vec<usize>, + // ids of motions which have this opcode + target_motion_ids: Vec<usize>, + // ids of motions that are located below + // top_motion_id, vtables corresponding + // to those motions must be deleted from + // replicaset message. + unused_motions: Vec<usize>, +} + +impl Plan { + // return true if given node is Motion containing seriliaze as empty + // opcode. If `check_enabled` is true checks that the opcode is enabled. + fn is_serialize_as_empty_motion(&self, node_id: usize, check_enabled: bool) -> bool { + if let Ok(Node::Relational(Relational::Motion { program, .. })) = self.get_node(node_id) { + if let Some(op) = program + .0 + .iter() + .find(|op| matches!(op, MotionOpcode::SerializeAsEmptyTable(_))) + { + return !check_enabled || matches!(op, MotionOpcode::SerializeAsEmptyTable(true)); + }; + } + false + } + + fn collect_top_ids(&self) -> Result<Vec<usize>, SbroadError> { + let mut stop_nodes: HashSet<usize> = HashSet::new(); + let iter_children = |node_id| -> RelationalIterator<'_> { + if self.is_serialize_as_empty_motion(node_id, true) { + stop_nodes.insert(node_id); + } + // do not traverse subtree with this child + if stop_nodes.contains(&node_id) { + return self.nodes.empty_rel_iter(); + } + self.nodes.rel_iter(node_id) + }; + let filter = |node_id: usize| -> bool { self.is_serialize_as_empty_motion(node_id, true) }; + let mut dfs = PostOrderWithFilter::with_capacity(iter_children, 4, Box::new(filter)); + + Ok(dfs.iter(self.get_top()?).map(|(_, id)| id).collect()) + } + + fn serialize_as_empty_info(&self) -> Result<Option<SerializeAsEmptyInfo>, SbroadError> { + let top_ids = self.collect_top_ids()?; + if top_ids.is_empty() { + return Ok(None); + } + + // all motion nodes that are inside the subtrees + // defined by `top_ids` + let all_motion_nodes = { + let is_motion = |node_id: usize| -> bool { + matches!( + self.get_node(node_id), + Ok(Node::Relational(Relational::Motion { .. })) + ) + }; + let mut all_motions = Vec::new(); + for top_id in &top_ids { + let mut dfs = PostOrderWithFilter::with_capacity( + |x| self.nodes.rel_iter(x), + REL_CAPACITY, + Box::new(is_motion), + ); + all_motions.extend(dfs.iter(*top_id).map(|(_, id)| id)); + } + all_motions + }; + let mut target_motions = Vec::new(); + let mut unused_motions = Vec::new(); + for id in all_motion_nodes { + if self.is_serialize_as_empty_motion(id, false) { + target_motions.push(id); + } else { + unused_motions.push(id); + } + } + + Ok(Some(SerializeAsEmptyInfo { + top_motion_ids: top_ids, + target_motion_ids: target_motions, + unused_motions, + })) + } +} + +/// Prepares execution plan for each replicaset. +/// +/// # Errors +/// - Failed to apply customization opcodes +/// - Failed to filter vtable +pub fn prepare_rs_to_ir_map( + rs_bucket_vec: &Vec<(String, Vec<u64>)>, + mut sub_plan: ExecutionPlan, +) -> Result<HashMap<String, ExecutionPlan>, SbroadError> { + let mut rs_ir: HashMap<String, ExecutionPlan> = HashMap::new(); rs_ir.reserve(rs_bucket_vec.len()); - // We split last pair in order not to call extra `clone` method. if let Some((last, other)) = rs_bucket_vec.split_last() { + let sae_info = sub_plan.get_ir_plan().serialize_as_empty_info()?; for (rs, bucket_ids) in other { let mut rs_plan = sub_plan.clone(); + if let Some(ref info) = sae_info { + apply_serialize_as_empty_opcode(&mut rs_plan, info)?; + } filter_vtable(&mut rs_plan, bucket_ids)?; - rs_ir.insert(rs.clone(), Message::from(encode_plan(rs_plan)?)); + rs_ir.insert(rs.clone(), rs_plan); } + if let Some(ref info) = sae_info { + disable_serialize_as_empty_opcode(&mut sub_plan, info)?; + } let (rs, bucket_ids) = last; filter_vtable(&mut sub_plan, bucket_ids)?; - rs_ir.insert(rs.clone(), Message::from(encode_plan(sub_plan)?)); + rs_ir.insert(rs.clone(), sub_plan); } - match &query_type { - QueryType::DQL => dql_on_some( - &*runtime.metadata()?, - rs_ir, - conn_type.is_readonly(), - vtable_max_rows, - ), - QueryType::DML => dml_on_some(&*runtime.metadata()?, rs_ir, conn_type.is_readonly()), + + Ok(rs_ir) +} + +fn apply_serialize_as_empty_opcode( + sub_plan: &mut ExecutionPlan, + info: &SerializeAsEmptyInfo, +) -> Result<(), SbroadError> { + if let Some(vtables_map) = sub_plan.get_mut_vtables() { + for motion_id in &info.unused_motions { + vtables_map.remove(motion_id); + } + } + + for top_id in &info.top_motion_ids { + sub_plan.unlink_motion_subtree(*top_id)?; } + Ok(()) +} + +fn disable_serialize_as_empty_opcode( + sub_plan: &mut ExecutionPlan, + info: &SerializeAsEmptyInfo, +) -> Result<(), SbroadError> { + for motion_id in &info.target_motion_ids { + let program = if let Relational::Motion { + policy, program, .. + } = sub_plan + .get_mut_ir_plan() + .get_mut_relation_node(*motion_id)? + { + if !matches!(policy, MotionPolicy::Local) { + continue; + } + program + } else { + return Err(SbroadError::Invalid( + Entity::Node, + Some(format!("expected motion node on id {motion_id}")), + )); + }; + for op in &mut program.0 { + if let MotionOpcode::SerializeAsEmptyTable(enabled) = op { + *enabled = false; + } + } + } + + Ok(()) } /// Map between replicaset uuid and the set of buckets (their ids) which correspond to that replicaset. /// This set is defined by vshard `router.route` function call. See `group_buckets_by_replicasets` /// function for more details. -type GroupedBuckets = HashMap<String, Vec<u64>>; +pub(crate) type GroupedBuckets = HashMap<String, Vec<u64>>; /// Function that transforms `Buckets` (set of bucket_ids) /// into `GroupedBuckets` (map from replicaset uuid to set of bucket_ids). diff --git a/sbroad-core/src/executor/engine/mock.rs b/sbroad-core/src/executor/engine/mock.rs index 630e7371d6d72f94ed191911901e8b0e469d20fe..518ae9b9f877ab99a8d08e0e93c4be506e649996 100644 --- a/sbroad-core/src/executor/engine/mock.rs +++ b/sbroad-core/src/executor/engine/mock.rs @@ -1,5 +1,7 @@ use std::any::Any; use std::cell::{Ref, RefCell}; +use std::cmp::Ordering; +use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::rc::Rc; @@ -33,7 +35,8 @@ use crate::ir::tree::Snapshot; use crate::ir::value::{LuaValue, Value}; use crate::ir::Plan; -use super::helpers::normalize_name_from_sql; +use super::helpers::vshard::{prepare_rs_to_ir_map, GroupedBuckets}; +use super::helpers::{dispatch_by_buckets, normalize_name_from_sql}; use super::{Metadata, QueryCache}; #[allow(clippy::module_name_repetitions)] @@ -865,6 +868,99 @@ impl RouterConfigurationMock { } } +/// Helper struct to group buckets by replicasets. +/// Assumes that all buckets are uniformly distributed +/// between replicasets: first rs holds p buckets, +/// second rs holds p buckets, .., last rs holds p + r +/// buckets. +/// Where: `p = bucket_cnt / rs_cnt, r = bucket_cnt % rs_cnt` +#[allow(clippy::module_name_repetitions)] +pub struct VshardMock { + // Holds boundaries of replicaset buckets: [start, end) + blocks: Vec<(u64, u64)>, +} + +impl VshardMock { + #[must_use] + pub fn new(rs_count: usize, bucket_count: u64) -> Self { + let mut blocks = Vec::new(); + let rs_count: u64 = rs_count as u64; + let buckets_per_rs = bucket_count / rs_count; + let remainder = bucket_count % rs_count; + for rs_idx in 0..rs_count { + let start = rs_idx * buckets_per_rs; + let end = start + buckets_per_rs; + blocks.push((start, end)); + } + if let Some(last_block) = blocks.last_mut() { + last_block.1 += remainder + 1; + } + Self { blocks } + } + + #[must_use] + #[allow(clippy::missing_panics_doc)] + pub fn group(&self, buckets: &Buckets) -> GroupedBuckets { + let mut res: GroupedBuckets = HashMap::new(); + match buckets { + Buckets::All => { + for (idx, (start, end)) in self.blocks.iter().enumerate() { + let name = Self::generate_rs_name(idx); + res.insert(name, ((*start)..(*end)).collect()); + } + } + Buckets::Filtered(buckets_set) => { + for bucket_id in buckets_set { + let comparator = |block: &(u64, u64)| -> Ordering { + let start = block.0; + let end = block.1; + if *bucket_id < start { + Ordering::Greater + } else if *bucket_id >= end { + Ordering::Less + } else { + Ordering::Equal + } + }; + let block_idx = match self.blocks.binary_search_by(comparator) { + Ok(idx) => idx, + Err(idx) => { + panic!("bucket_id: {bucket_id}, err_idx: {idx}"); + } + }; + let name = Self::generate_rs_name(block_idx); + match res.entry(name) { + Entry::Occupied(mut e) => { + e.get_mut().push(*bucket_id); + } + Entry::Vacant(e) => { + e.insert(vec![*bucket_id]); + } + } + } + } + Buckets::Any => { + res.insert(Self::generate_rs_name(0), vec![0]); + } + } + + res + } + + #[must_use] + pub fn generate_rs_name(idx: usize) -> String { + format!("replicaset_{idx}") + } + + #[must_use] + #[allow(clippy::missing_panics_doc)] + pub fn get_id(name: &str) -> usize { + name[name.find('_').unwrap() + 1..] + .parse::<usize>() + .unwrap() + } +} + #[allow(clippy::module_name_repetitions)] pub struct RouterRuntimeMock { metadata: RefCell<RouterConfigurationMock>, @@ -872,6 +968,7 @@ pub struct RouterRuntimeMock { ir_cache: RefCell<LRUCache<String, Plan>>, table_statistics_cache: RefCell<HashMap<String, Rc<TableStats>>>, initial_column_statistics_cache: RefCell<HashMap<TableColumnPair, Rc<Box<dyn Any>>>>, + pub vshard_mock: VshardMock, } impl std::fmt::Debug for RouterRuntimeMock { @@ -975,13 +1072,10 @@ impl Vshard for RouterRuntimeMock { fn exec_ir_on_some( &self, - _sub_plan: ExecutionPlan, - _buckets: &Buckets, + sub_plan: ExecutionPlan, + buckets: &Buckets, ) -> Result<Box<dyn Any>, SbroadError> { - Err(SbroadError::Unsupported( - Entity::Runtime, - Some("exec_ir_on_some is not supported for the mock runtime".to_string()), - )) + mock_exec_ir_on_some(&self.vshard_mock, buckets, sub_plan) } } @@ -1021,16 +1115,32 @@ impl Vshard for &RouterRuntimeMock { fn exec_ir_on_some( &self, - _sub_plan: ExecutionPlan, - _buckets: &Buckets, + sub_plan: ExecutionPlan, + buckets: &Buckets, ) -> Result<Box<dyn Any>, SbroadError> { - Err(SbroadError::Unsupported( - Entity::Runtime, - Some("exec_ir_on_some is not supported for the mock runtime".to_string()), - )) + mock_exec_ir_on_some(&self.vshard_mock, buckets, sub_plan) } } +fn mock_exec_ir_on_some( + vshard_mock: &VshardMock, + buckets: &Buckets, + sub_plan: ExecutionPlan, +) -> Result<Box<dyn Any>, SbroadError> { + let mut rs_bucket_vec: Vec<(String, Vec<u64>)> = vshard_mock.group(buckets).drain().collect(); + // sort to get deterministic test results + rs_bucket_vec.sort_by_key(|(rs_name, _)| rs_name.clone()); + let rs_ir = prepare_rs_to_ir_map(&rs_bucket_vec, sub_plan)?; + let mut dispatch_vec: Vec<ReplicasetDispatchInfo> = Vec::new(); + for (rs_name, exec_plan) in rs_ir { + let id = VshardMock::get_id(&rs_name); + let dispatch = ReplicasetDispatchInfo::new(id, &exec_plan); + dispatch_vec.push(dispatch); + } + dispatch_vec.sort_by_key(|d| d.rs_id); + Ok(Box::new(dispatch_vec)) +} + impl Default for RouterRuntimeMock { fn default() -> Self { Self::new() @@ -1338,12 +1448,15 @@ impl RouterRuntimeMock { Rc::new(boxed_column_stats), ); + let meta = RouterConfigurationMock::new(); + let bucket_cnt = meta.bucket_count; RouterRuntimeMock { - metadata: RefCell::new(RouterConfigurationMock::new()), + metadata: RefCell::new(meta), virtual_tables: RefCell::new(HashMap::new()), ir_cache: RefCell::new(cache), table_statistics_cache: RefCell::new(table_statistics_cache), initial_column_statistics_cache: RefCell::new(column_statistics_cache), + vshard_mock: VshardMock::new(2, bucket_cnt), } } @@ -1553,3 +1666,58 @@ fn exec_on_all(query: &str) -> ProducerResult { result } + +#[derive(Debug, PartialEq, Clone)] +pub struct ReplicasetDispatchInfo { + pub rs_id: usize, + pub pattern: String, + pub params: Vec<Value>, + pub vtables_map: HashMap<usize, Rc<VirtualTable>>, +} + +impl ReplicasetDispatchInfo { + #[must_use] + #[allow(clippy::missing_panics_doc)] + pub fn new(rs_id: usize, exec_plan: &ExecutionPlan) -> Self { + let top = exec_plan.get_ir_plan().get_top().unwrap(); + let sp = SyntaxPlan::new(exec_plan, top, Snapshot::Oldest).unwrap(); + let ordered_sn = OrderedSyntaxNodes::try_from(sp).unwrap(); + let syntax_data_nodes = ordered_sn.to_syntax_data().unwrap(); + let (pattern_with_params, _) = exec_plan + .to_sql(&syntax_data_nodes, &Buckets::All, "test") + .unwrap(); + let mut vtables: HashMap<usize, Rc<VirtualTable>> = HashMap::new(); + if let Some(vtables_map) = exec_plan.get_vtables() { + vtables = vtables_map.clone(); + } + Self { + rs_id, + pattern: pattern_with_params.pattern, + params: pattern_with_params.params, + vtables_map: vtables, + } + } +} + +impl RouterRuntimeMock { + pub fn set_vshard_mock(&mut self, rs_count: usize) { + self.vshard_mock = VshardMock::new(rs_count, self.bucket_count()); + } + + /// Imitates the real pipeline of dispatching plan subtree + /// on the given buckets. But does not encode plan into + /// message for sending, instead it evalutes what sql + /// query will be executed on each replicaset, and what vtables + /// will be send to that replicaset. + #[allow(clippy::missing_panics_doc)] + pub fn detailed_dispatch( + &self, + plan: ExecutionPlan, + buckets: &Buckets, + ) -> Vec<ReplicasetDispatchInfo> { + *dispatch_by_buckets(plan, buckets, self) + .unwrap() + .downcast::<Vec<ReplicasetDispatchInfo>>() + .unwrap() + } +} diff --git a/sbroad-core/src/executor/ir.rs b/sbroad-core/src/executor/ir.rs index 33bcfa84e2213e97a326d11703b3bfebfbcf1e6a..db8ec751e9684bfca688ac89a71e287ca85b9c24 100644 --- a/sbroad-core/src/executor/ir.rs +++ b/sbroad-core/src/executor/ir.rs @@ -185,6 +185,7 @@ impl ExecutionPlan { })?; vtable.add_missing_rows(from_vtable)?; } + MotionOpcode::SerializeAsEmptyTable(_) => {} } } @@ -210,6 +211,29 @@ impl ExecutionPlan { }) } + /// Return true if plan needs to be customized for each storage. + /// I.e we can't send the same plan to all storages. + /// + /// The check is done by iterating over the plan nodes arena, + /// and checking whether motion node contains serialize as empty + /// opcode. Be sure there are no dead nodes in the plan arena: + /// nodes that are not referenced by actual plan tree. + #[must_use] + pub fn has_customization_opcodes(&self) -> bool { + for node in self.get_ir_plan().nodes.iter() { + if let Node::Relational(Relational::Motion { program, .. }) = node { + if program + .0 + .iter() + .any(|op| matches!(op, MotionOpcode::SerializeAsEmptyTable(_))) + { + return true; + } + } + } + false + } + /// Extract policy from motion node /// /// # Errors diff --git a/sbroad-core/src/executor/tests/exec_plan.rs b/sbroad-core/src/executor/tests/exec_plan.rs index 808ba57ba85c4945a09bcf33ce9f5937a8bbda37..47efc30a86b7a3d30441cdd9383ed746fc80389a 100644 --- a/sbroad-core/src/executor/tests/exec_plan.rs +++ b/sbroad-core/src/executor/tests/exec_plan.rs @@ -1,12 +1,15 @@ +use itertools::Itertools; use pretty_assertions::assert_eq; use crate::backend::sql::tree::{OrderedSyntaxNodes, SyntaxPlan}; -use crate::executor::engine::mock::RouterRuntimeMock; +use crate::collection; +use crate::executor::engine::helpers::filter_vtable; +use crate::executor::engine::mock::{ReplicasetDispatchInfo, RouterRuntimeMock, VshardMock}; use crate::ir::relation::Type; use crate::ir::tests::{column_integer_user_non_null, column_user_non_null}; use crate::ir::transformation::redistribution::MotionPolicy; use crate::ir::tree::Snapshot; -use crate::ir::Slice; +use crate::ir::{Node, Slice}; use super::*; @@ -806,3 +809,280 @@ fn global_table_scan() { ) ); } + +#[test] +fn global_union_all() { + let sql = r#"SELECT "a", "b" from "global_t" union all select "e", "f" from "t2""#; + let mut coordinator = RouterRuntimeMock::new(); + coordinator.set_vshard_mock(2); + + let mut query = Query::new(&coordinator, sql, vec![]).unwrap(); + let exec_plan = query.get_mut_exec_plan(); + + let top_id = exec_plan.get_ir_plan().get_top().unwrap(); + let buckets = query.bucket_discovery(top_id).unwrap(); + assert_eq!(Buckets::All, buckets); + let exec_plan = query.get_mut_exec_plan(); + let sub_plan = exec_plan.take_subtree(top_id).unwrap(); + let actual_dispatch = coordinator.detailed_dispatch(sub_plan, &buckets); + let expected = vec![ + ReplicasetDispatchInfo { + rs_id: 0, + pattern: r#" select null, null where false UNION ALL SELECT "t2"."e", "t2"."f" FROM "t2""#.to_string(), + params: vec![], + vtables_map: HashMap::new(), + }, + ReplicasetDispatchInfo { + rs_id: 1, + pattern: r#"SELECT "global_t"."a", "global_t"."b" FROM "global_t" UNION ALL SELECT "t2"."e", "t2"."f" FROM "t2""#.to_string(), + params: vec![], + vtables_map: HashMap::new(), + }, + ]; + + assert_eq!(expected, actual_dispatch); +} + +#[test] +fn global_union_all2() { + // check that we don't send virtual table to replicasets, where + // global child is not materialized. + let sql = r#"SELECT "a", "b" from "global_t" where "b" + in (select "e" from "t2") + union all select "e", "f" from "t2""#; + let mut coordinator = RouterRuntimeMock::new(); + coordinator.set_vshard_mock(3); + + let mut query = Query::new(&coordinator, sql, vec![]).unwrap(); + + let motion_id = *query + .exec_plan + .get_ir_plan() + .clone_slices() + .slice(0) + .unwrap() + .position(0) + .unwrap(); + let motion_child = query.exec_plan.get_motion_subtree_root(motion_id).unwrap(); + // imitate plan execution + query.exec_plan.take_subtree(motion_child).unwrap(); + + let mut virtual_table = VirtualTable::new(); + virtual_table.add_column(column_integer_user_non_null(String::from("e"))); + virtual_table.add_tuple(vec![Value::Integer(1)]); + let mut exec_plan = query.get_mut_exec_plan(); + exec_plan + .set_motion_vtable(motion_id, virtual_table.clone(), &coordinator) + .unwrap(); + + let top_id = exec_plan.get_ir_plan().get_top().unwrap(); + let buckets = query.bucket_discovery(top_id).unwrap(); + assert_eq!(Buckets::All, buckets); + let exec_plan = query.get_mut_exec_plan(); + + let sub_plan = exec_plan.take_subtree(top_id).unwrap(); + // after take subtree motion id has changed + let (motion_id, _) = sub_plan + .get_ir_plan() + .nodes + .iter() + .find_position(|n| { + matches!( + n, + Node::Relational(Relational::Motion { + policy: MotionPolicy::Full, + .. + }) + ) + }) + .unwrap(); + + let actual_dispatch = coordinator.detailed_dispatch(sub_plan, &buckets); + + let expected = vec![ + ReplicasetDispatchInfo { + rs_id: 0, + pattern: r#" select null, null where false UNION ALL SELECT "t2"."e", "t2"."f" FROM "t2""#.to_string(), + params: vec![], + vtables_map: HashMap::new(), + }, + ReplicasetDispatchInfo { + rs_id: 1, + pattern: r#" select null, null where false UNION ALL SELECT "t2"."e", "t2"."f" FROM "t2""#.to_string(), + params: vec![], + vtables_map: HashMap::new(), + }, + ReplicasetDispatchInfo { + rs_id: 2, + pattern: r#"SELECT "global_t"."a", "global_t"."b" FROM "global_t" WHERE ("global_t"."b") in (SELECT "e" FROM "TMP_test_16") UNION ALL SELECT "t2"."e", "t2"."f" FROM "t2""#.to_string(), + params: vec![], + vtables_map: collection!(motion_id => Rc::new(virtual_table)), + }, + ]; + + assert_eq!(expected, actual_dispatch); +} + +#[test] +fn global_union_all3() { + // also check that we don't delete vtables, + // from other subtree + let sql = r#" + select "a" from "global_t" where "b" + in (select "e" from "t2") + union all + select "f" from "t2" + group by "f""#; + let mut coordinator = RouterRuntimeMock::new(); + coordinator.set_vshard_mock(2); + + let mut query = Query::new(&coordinator, sql, vec![]).unwrap(); + + let slices = query.exec_plan.get_ir_plan().clone_slices(); + let sq_motion_id = *slices.slice(0).unwrap().position(0).unwrap(); + let sq_motion_child = query + .exec_plan + .get_motion_subtree_root(sq_motion_id) + .unwrap(); + + // imitate plan execution + query.exec_plan.take_subtree(sq_motion_child).unwrap(); + + let mut sq_vtable = VirtualTable::new(); + sq_vtable.add_column(column_integer_user_non_null(String::from("f"))); + sq_vtable.add_tuple(vec![Value::Integer(1)]); + query + .exec_plan + .set_motion_vtable(sq_motion_id, sq_vtable.clone(), &coordinator) + .unwrap(); + + let groupby_motion_id = *slices.slice(1).unwrap().position(0).unwrap(); + let groupby_motion_child = query + .exec_plan + .get_motion_subtree_root(groupby_motion_id) + .unwrap(); + query.exec_plan.take_subtree(groupby_motion_child).unwrap(); + + let mut groupby_vtable = VirtualTable::new(); + // these tuples must belong to different replicasets + let tuple1 = vec![Value::Integer(3)]; + let tuple2 = vec![Value::Integer(2929)]; + groupby_vtable.add_column(column_integer_user_non_null(String::from("column_51"))); + groupby_vtable.add_tuple(tuple1.clone()); + groupby_vtable.add_tuple(tuple2.clone()); + if let MotionPolicy::Segment(key) = + get_motion_policy(query.exec_plan.get_ir_plan(), groupby_motion_id) + { + groupby_vtable.reshard(key, &query.coordinator).unwrap(); + } + query + .exec_plan + .set_motion_vtable(groupby_motion_id, groupby_vtable.clone(), &coordinator) + .unwrap(); + + let top_id = query.exec_plan.get_ir_plan().get_top().unwrap(); + let buckets = query.bucket_discovery(top_id).unwrap(); + let bucket1 = coordinator.determine_bucket_id(&[&tuple1[0]]).unwrap(); + let bucket2 = coordinator.determine_bucket_id(&[&tuple2[0]]).unwrap(); + let expected_buckets = Buckets::Filtered(collection!(bucket1, bucket2)); + assert_eq!(expected_buckets, buckets); + + let grouped = coordinator.vshard_mock.group(&buckets); + let mut rs_buckets: Vec<(usize, Vec<u64>)> = grouped + .into_iter() + .map(|(k, v)| (VshardMock::get_id(&k), v)) + .collect(); + rs_buckets.sort_by_key(|(id, _)| *id); + let groupby_vtable1 = groupby_vtable.new_with_buckets(&rs_buckets[0].1).unwrap(); + let groupby_vtable2 = groupby_vtable.new_with_buckets(&rs_buckets[1].1).unwrap(); + + let sub_plan = query.exec_plan.take_subtree(top_id).unwrap(); + // after take subtree motion id has changed + let (groupby_motion_id, _) = sub_plan + .get_ir_plan() + .nodes + .iter() + .find_position(|n| { + matches!( + n, + Node::Relational(Relational::Motion { + policy: MotionPolicy::Segment(_), + .. + }) + ) + }) + .unwrap(); + let (sq_motion_id, _) = sub_plan + .get_ir_plan() + .nodes + .iter() + .find_position(|n| { + matches!( + n, + Node::Relational(Relational::Motion { + policy: MotionPolicy::Full, + .. + }) + ) + }) + .unwrap(); + + let actual_dispatch = coordinator.detailed_dispatch(sub_plan, &buckets); + + let expected = vec![ + ReplicasetDispatchInfo { + rs_id: 0, + pattern: r#" select null where false UNION ALL SELECT "column_51" as "f" FROM (SELECT "column_51" FROM "TMP_test_35") GROUP BY "column_51""#.to_string(), + params: vec![], + vtables_map: collection!(groupby_motion_id => Rc::new(groupby_vtable1)), + }, + ReplicasetDispatchInfo { + rs_id: 1, + pattern: r#"SELECT "global_t"."a" FROM "global_t" WHERE ("global_t"."b") in (SELECT "f" FROM "TMP_test_14") UNION ALL SELECT "column_51" as "f" FROM (SELECT "column_51" FROM "TMP_test_35") GROUP BY "column_51""#.to_string(), + params: vec![], + vtables_map: collection!(sq_motion_id => Rc::new(sq_vtable), groupby_motion_id => Rc::new(groupby_vtable2)), + }, + ]; + + assert_eq!(expected, actual_dispatch); +} + +#[test] +fn global_union_all4() { + let sql = r#" + select "b" from "global_t" + union all + select * from ( + select "a" from "global_t" + union all + select "f" from "t2" + ) + "#; + let mut coordinator = RouterRuntimeMock::new(); + coordinator.set_vshard_mock(2); + + let mut query = Query::new(&coordinator, sql, vec![]).unwrap(); + + let top_id = query.exec_plan.get_ir_plan().get_top().unwrap(); + let buckets = query.bucket_discovery(top_id).unwrap(); + let sub_plan = query.exec_plan.take_subtree(top_id).unwrap(); + + let actual_dispatch = coordinator.detailed_dispatch(sub_plan, &buckets); + + let expected = vec![ + ReplicasetDispatchInfo { + rs_id: 0, + pattern: r#" select null where false UNION ALL SELECT "a" FROM (select null where false UNION ALL SELECT "t2"."f" FROM "t2")"#.to_string(), + params: vec![], + vtables_map: HashMap::new(), + }, + ReplicasetDispatchInfo { + rs_id: 1, + pattern: r#"SELECT "global_t"."b" FROM "global_t" UNION ALL SELECT "a" FROM (SELECT "global_t"."a" FROM "global_t" UNION ALL SELECT "t2"."f" FROM "t2")"#.to_string(), + params: vec![], + vtables_map: HashMap::new(), + }, + ]; + + assert_eq!(expected, actual_dispatch); +} diff --git a/sbroad-core/src/frontend/sql/ir/tests/global.rs b/sbroad-core/src/frontend/sql/ir/tests/global.rs index 06081bf979c59154e758b20e65a24675731fb46a..5df84052d35ca02945527e6cef9e90892f1840e8 100644 --- a/sbroad-core/src/frontend/sql/ir/tests/global.rs +++ b/sbroad-core/src/frontend/sql/ir/tests/global.rs @@ -19,11 +19,6 @@ fn front_sql_check_global_tbl_support() { let metadata = &RouterConfigurationMock::new(); - check_error( - r#"select "a" from "global_t" union all select * from (select "a" as "oa" from "t3")"#, - metadata, - global_tbl_err!("UnionAll"), - ); check_error( r#"select "a" from "global_t" except select * from (select "a" as "oa" from "t3")"#, metadata, @@ -1014,3 +1009,132 @@ vtable_max_rows = 5000 ); assert_eq!(expected_explain, plan.as_explain().unwrap()); } + +fn check_union_dist(plan: &Plan, expected_distributions: &[DistMock]) { + let filter = |id: usize| -> bool { + matches!( + plan.get_node(id), + Ok(Node::Relational(Relational::UnionAll { .. })) + ) + }; + let nodes = collect_relational(plan, Box::new(filter)); + check_distributions(plan, &nodes, expected_distributions); +} + +#[test] +fn front_sql_global_union_all1() { + let input = r#" + select "a", "b" from "global_t" + union all + select "e", "f" from "t2" + "#; + + let plan = sql_to_optimized_ir(input, vec![]); + + let expected_explain = String::from( + r#"union all + motion [policy: local] + projection ("global_t"."a"::integer -> "a", "global_t"."b"::integer -> "b") + scan "global_t" + projection ("t2"."e"::unsigned -> "e", "t2"."f"::unsigned -> "f") + scan "t2" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + assert_eq!(expected_explain, plan.as_explain().unwrap()); + + check_union_dist(&plan, &[DistMock::Any]); +} + +#[test] +fn front_sql_global_union_all2() { + let input = r#" + select "a" from "global_t" + union all + select "e" from "t2" + "#; + + let plan = sql_to_optimized_ir(input, vec![]); + + let expected_explain = String::from( + r#"union all + motion [policy: local] + projection ("global_t"."a"::integer -> "a") + scan "global_t" + projection ("t2"."e"::unsigned -> "e") + scan "t2" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + assert_eq!(expected_explain, plan.as_explain().unwrap()); + + check_union_dist(&plan, &[DistMock::Any]); +} + +#[test] +fn front_sql_global_union_all3() { + let input = r#" + select * from (select "a" from "global_t" + union all + select sum("e") from "t2") + union all + select "b" from "global_t" + "#; + + let plan = sql_to_optimized_ir(input, vec![]); + + let expected_explain = String::from( + r#"union all + projection ("a"::integer -> "a") + scan + union all + projection ("global_t"."a"::integer -> "a") + scan "global_t" + motion [policy: segment([ref("COL_1")])] + projection (sum(("sum_23"::decimal))::decimal -> "COL_1") + motion [policy: full] + scan + projection (sum(("t2"."e"::unsigned))::decimal -> "sum_23") + scan "t2" + motion [policy: local] + projection ("global_t"."b"::integer -> "b") + scan "global_t" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + assert_eq!(expected_explain, plan.as_explain().unwrap()); + + check_union_dist(&plan, &[DistMock::Any, DistMock::Any]); +} + +#[test] +fn front_sql_global_union_all5() { + let input = r#" + select "a" from "global_t" + union all + select "b" from "global_t" + "#; + + let plan = sql_to_optimized_ir(input, vec![]); + + let expected_explain = String::from( + r#"union all + projection ("global_t"."a"::integer -> "a") + scan "global_t" + projection ("global_t"."b"::integer -> "b") + scan "global_t" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + assert_eq!(expected_explain, plan.as_explain().unwrap()); + + check_union_dist(&plan, &[DistMock::Global]); +} diff --git a/sbroad-core/src/ir/distribution.rs b/sbroad-core/src/ir/distribution.rs index 76b80439418f1eb2c7764bc2dd8d8468c1610fd5..1e8b321d85b937cc27e6fe9db8f86632927d35f1 100644 --- a/sbroad-core/src/ir/distribution.rs +++ b/sbroad-core/src/ir/distribution.rs @@ -113,17 +113,48 @@ pub enum Distribution { } impl Distribution { + fn union(left: &Distribution, right: &Distribution) -> Result<Distribution, SbroadError> { + let dist = match (left, right) { + ( + Distribution::Global | Distribution::Any, + Distribution::Any | Distribution::Segment { .. }, + ) + | ( + Distribution::Any | Distribution::Segment { .. }, + Distribution::Global | Distribution::Any, + ) => Distribution::Any, + (Distribution::Global, Distribution::Global) => Distribution::Global, + (Distribution::Single, _) | (_, Distribution::Single) => { + return Err(SbroadError::Invalid( + Entity::Distribution, + Some(format!("union child has unexpected distribution Single. Left: {left:?}, right: {right:?}")))); + } + ( + Distribution::Segment { + keys: keys_left, .. + }, + Distribution::Segment { + keys: keys_right, .. + }, + ) => { + let mut keys: HashSet<Key, RepeatableState> = HashSet::with_hasher(RepeatableState); + for key in keys_left.intersection(keys_right).iter() { + keys.insert(Key::new(key.positions.clone())); + } + if keys.is_empty() { + Distribution::Any + } else { + Distribution::Segment { keys: keys.into() } + } + } + }; + Ok(dist) + } + /// Calculate a new distribution for the `Except` and `UnionAll` output tuple. /// Single - fn union_except( - left: &Distribution, - right: &Distribution, - ) -> Result<Distribution, SbroadError> { + fn except(left: &Distribution, right: &Distribution) -> Result<Distribution, SbroadError> { let dist = match (left, right) { - // Currently Global distribution may come - // only from Motion(Full). The check for - // reading from global tables is done in - // conflict resolution. (Distribution::Global, _) => right.clone(), (_, Distribution::Global) => left.clone(), (Distribution::Single, _) | (_, Distribution::Single) => { @@ -681,9 +712,8 @@ impl Plan { let parent = self.get_relation_node(parent_id)?; let new_dist = match parent { - Relational::Except { .. } | Relational::UnionAll { .. } => { - Distribution::union_except(&left_dist, &right_dist)? - } + Relational::Except { .. } => Distribution::except(&left_dist, &right_dist)?, + Relational::UnionAll { .. } => Distribution::union(&left_dist, &right_dist)?, Relational::Join { .. } => Distribution::join(&left_dist, &right_dist)?, _ => { return Err(SbroadError::Invalid( diff --git a/sbroad-core/src/ir/transformation/redistribution.rs b/sbroad-core/src/ir/transformation/redistribution.rs index 6e8b85be66ab65bc3dd9491c68964e0596673cc3..7ed29cadb4816c096c69b0280ec822793c197623 100644 --- a/sbroad-core/src/ir/transformation/redistribution.rs +++ b/sbroad-core/src/ir/transformation/redistribution.rs @@ -132,6 +132,21 @@ pub enum MotionOpcode { AddMissingRowsForLeftJoin { motion_id: usize, }, + /// When set to `true` this opcode serializes + /// motion subtree to sql that produces + /// empty table. + /// + /// Relevant only for Local motion policy. + /// Must be initialized to `true` by planner, + /// executor garuantees to mark only one replicaset + /// which will have `false` value in this opcode. + /// For all replicasets that will have `true` value, + /// executor will remove all virtual tables used in + /// below subtree and unlink the given motion node. + /// + /// Note: currently this opcode is only used for + /// execution of union all having global child and sharded child. + SerializeAsEmptyTable(bool), } /// Helper struct that unwraps `Expression::Bool` fields. @@ -1002,7 +1017,7 @@ impl Plan { )); } } - Relational::UnionAll { .. } | Relational::Except { .. } => { + Relational::Except { .. } => { let left_dist = self.get_distribution( self.get_relational_output(self.get_relational_child(rel_id, 0)?)?, )?; @@ -1026,7 +1041,8 @@ impl Plan { | Relational::Selection { .. } | Relational::ValuesRow { .. } | Relational::ScanSubQuery { .. } - | Relational::Motion { .. } => {} + | Relational::Motion { .. } + | Relational::UnionAll { .. } => {} } Ok(()) } @@ -1861,56 +1877,85 @@ impl Plan { } fn resolve_union_conflicts(&mut self, rel_id: usize) -> Result<Strategy, SbroadError> { - let mut map = Strategy::new(rel_id); - match self.get_relation_node(rel_id)? { - Relational::UnionAll { children, .. } => { - if let (Some(left), Some(right), None) = - (children.first(), children.get(1), children.get(2)) - { - let left_output_id = self.get_relation_node(*left)?.output(); - let right_output_id = self.get_relation_node(*right)?.output(); - let left_output_row = - self.get_expression_node(left_output_id)?.get_row_list()?; - let right_output_row = - self.get_expression_node(right_output_id)?.get_row_list()?; - if left_output_row.len() != right_output_row.len() { - return Err(SbroadError::UnexpectedNumberOfValues(format!( - "Except node children have different row lengths: left {}, right {}", - left_output_row.len(), - right_output_row.len() - ))); - } - let left_dist = self.get_distribution(left_output_id)?; - let right_dist = self.get_distribution(right_output_id)?; - if let Distribution::Single = left_dist { - map.add_child( - *left, - MotionPolicy::Segment(MotionKey { - targets: vec![Target::Reference(0)], - }), - Program::default(), - ); - } - if let Distribution::Single = right_dist { - map.add_child( - *right, - MotionPolicy::Segment(MotionKey { - targets: vec![Target::Reference(0)], - }), - Program::default(), - ); - } - return Ok(map); - } - Err(SbroadError::UnexpectedNumberOfValues( - "UnionAll node doesn't have exactly two children.".into(), - )) - } - _ => Err(SbroadError::Invalid( + if !matches!(self.get_relation_node(rel_id)?, Relational::UnionAll { .. }) { + return Err(SbroadError::Invalid( Entity::Relational, Some("expected UnionAll node".into()), - )), + )); + } + let mut map = Strategy::new(rel_id); + let left_id = self.get_relational_child(rel_id, 0)?; + let right_id = self.get_relational_child(rel_id, 1)?; + let left_output_id = self.get_relation_node(left_id)?.output(); + let right_output_id = self.get_relation_node(right_id)?.output(); + + { + let left_output_row = self.get_expression_node(left_output_id)?.get_row_list()?; + let right_output_row = self.get_expression_node(right_output_id)?.get_row_list()?; + if left_output_row.len() != right_output_row.len() { + return Err(SbroadError::UnexpectedNumberOfValues(format!( + "Except node children have different row lengths: left {}, right {}", + left_output_row.len(), + right_output_row.len() + ))); + } + } + + let left_dist = self.get_distribution(left_output_id)?; + let right_dist = self.get_distribution(right_output_id)?; + match (left_dist, right_dist) { + (Distribution::Single, Distribution::Single) => { + // todo: do not use a motion here + map.add_child( + left_id, + MotionPolicy::Segment(MotionKey { + targets: vec![Target::Reference(0)], + }), + Program::default(), + ); + map.add_child( + right_id, + MotionPolicy::Segment(MotionKey { + targets: vec![Target::Reference(0)], + }), + Program::default(), + ); + } + (Distribution::Single, _) => { + map.add_child( + left_id, + MotionPolicy::Segment(MotionKey { + targets: vec![Target::Reference(0)], + }), + Program::default(), + ); + } + (_, Distribution::Single) => { + map.add_child( + right_id, + MotionPolicy::Segment(MotionKey { + targets: vec![Target::Reference(0)], + }), + Program::default(), + ); + } + (Distribution::Global, Distribution::Segment { .. } | Distribution::Any) => { + map.add_child( + left_id, + MotionPolicy::Local, + Program(vec![MotionOpcode::SerializeAsEmptyTable(true)]), + ); + } + (Distribution::Segment { .. } | Distribution::Any, Distribution::Global) => { + map.add_child( + right_id, + MotionPolicy::Local, + Program(vec![MotionOpcode::SerializeAsEmptyTable(true)]), + ); + } + (_, _) => {} } + Ok(map) } /// Add motion nodes to the plan tree. diff --git a/sbroad-core/src/ir/tree/relation.rs b/sbroad-core/src/ir/tree/relation.rs index 06441067f173ab86cac80a075c21417f7bfa86cb..a87b32d271abcceb9460b07384b887afa8801090 100644 --- a/sbroad-core/src/ir/tree/relation.rs +++ b/sbroad-core/src/ir/tree/relation.rs @@ -25,6 +25,15 @@ impl<'n> Nodes { nodes: self, } } + + #[must_use] + pub fn empty_rel_iter(&'n self) -> RelationalIterator<'n> { + RelationalIterator { + current: self.next_id(), + child: RefCell::new(1000), + nodes: self, + } + } } impl<'nodes> TreeIterator<'nodes> for RelationalIterator<'nodes> {