diff --git a/src/ir.rs b/src/ir.rs index 2fb6f345542d1a081691cebcb80a52adddd90b81..5e134826e32a93abb959928cb90c38d4e3c8b9c2 100644 --- a/src/ir.rs +++ b/src/ir.rs @@ -2,18 +2,20 @@ //! //! Contains the logical plan tree and helpers. +pub mod distribution; pub mod expression; pub mod operator; pub mod relation; pub mod value; use crate::errors::QueryPlannerError; +use distribution::Distribution; use expression::Expression; use operator::Relational; use relation::Table; use serde::{Deserialize, Serialize}; use std::cell::RefCell; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; /// Plan tree node. /// @@ -162,6 +164,245 @@ impl Plan { } map } + + /// Calculate and set tuple distribution. + /// + /// As the references in the `Row` expression contain only logical ID of the parent relational nodes, + /// we need at first traverse all the plan nodes and build a "logical id - array position" map with + /// `relational_id_map()` function and pass its reference to this function. + /// + /// # Errors + /// Returns `QueryPlannerError` when current expression is not a `Row` or contains broken references. + pub fn set_distribution<S: ::std::hash::BuildHasher + Default>( + &mut self, + row_node: usize, + id_map: &HashMap<usize, usize, S>, + ) -> Result<(), QueryPlannerError> { + let mut child_set: HashSet<usize> = HashSet::new(); + let mut child_pos_map: HashMap<(usize, usize), usize> = HashMap::new(); + let mut table_set: HashSet<String> = HashSet::new(); + let mut table_pos_map: HashMap<usize, usize> = HashMap::default(); + let mut parent_node: Option<usize> = None; + + if let Node::Expression(Expression::Row { list, .. }) = self.get_node(row_node)? { + // Gather information about children nodes, that are pointed by the row references. + for (pos, alias_node) in list.iter().enumerate() { + if let Node::Expression(Expression::Alias { child, .. }) = self.get_node(*alias_node)? { + if let Node::Expression(Expression::Reference { + targets, + position, + parent, + .. + }) = self.get_node(*child)? + { + // Get the relational node, containing this row + parent_node = Some(*id_map.get(parent).ok_or(QueryPlannerError::InvalidNode)?); + if let Node::Relational(relational_op) = + self.get_node(parent_node.ok_or(QueryPlannerError::InvalidNode)?)? + { + if let Some(children) = relational_op.children() { + // References in the branch node. + let child_pos_list: &Vec<usize> = targets + .as_ref() + .ok_or(QueryPlannerError::InvalidReference)?; + for target in child_pos_list { + let child_node: usize = *children + .get(*target) + .ok_or(QueryPlannerError::ValueOutOfRange)?; + child_set.insert(child_node); + child_pos_map.insert((child_node, *position), pos); + } + } else { + // References in the leaf (relation scan) node. + if targets.is_some() { + return Err(QueryPlannerError::InvalidReference); + } + if let Relational::ScanRelation { relation, .. } = relational_op { + table_set.insert(relation.clone()); + table_pos_map.insert(*position, pos); + } else { + return Err(QueryPlannerError::InvalidReference); + } + } + } else { + return Err(QueryPlannerError::InvalidNode); + } + } + } + } + } + + match child_set.len() { + 0 => { + // Scan + self.set_scan_dist(&table_set, &table_pos_map, row_node)?; + } + 1 => { + // Single child + let child: usize = *child_set + .iter() + .next() + .ok_or(QueryPlannerError::InvalidNode)?; + let suggested_dist = self.dist_from_child(child, &child_pos_map)?; + if let Node::Expression(Expression::Row { + ref mut distribution, + .. + }) = self + .nodes + .get_mut(row_node) + .ok_or(QueryPlannerError::InvalidRow)? + { + *distribution = Some(suggested_dist); + } + } + 2 => { + // Union, join + self.set_two_children_node_dist( + &child_set, + &child_pos_map, + &parent_node, + row_node, + )?; + } + _ => return Err(QueryPlannerError::InvalidReference), + } + Ok(()) + } + + // Private methods + + fn dist_from_child( + &self, + child_rel_node: usize, + child_pos_map: &HashMap<(usize, usize), usize>, + ) -> Result<Distribution, QueryPlannerError> { + if let Node::Relational(relational_op) = self.get_node(child_rel_node)? { + if let Node::Expression(Expression::Row { + distribution: child_dist, + .. + }) = self.get_node(relational_op.output())? + { + match child_dist { + None => return Err(QueryPlannerError::UninitializedDistribution), + Some(Distribution::Random) => return Ok(Distribution::Random), + Some(Distribution::Replicated) => return Ok(Distribution::Replicated), + Some(Distribution::Coordinator) => return Ok(Distribution::Coordinator), + Some(Distribution::Segment { key }) => { + let mut new_key: Vec<usize> = Vec::new(); + let all_found = key.iter().all(|pos| { + child_pos_map.get(&(child_rel_node, *pos)).map_or(false, |v| { + new_key.push(*v); + true + }) + }); + if all_found { + return Ok(Distribution::Segment { key: new_key }); + } + return Ok(Distribution::Random); + } + } + } + } + Err(QueryPlannerError::InvalidRow) + } + + fn set_scan_dist( + &mut self, + table_set: &HashSet<String>, + table_pos_map: &HashMap<usize, usize>, + row_node: usize, + ) -> Result<(), QueryPlannerError> { + if table_set.len() != 1 { + return Err(QueryPlannerError::InvalidNode); + } + if let Some(relations) = &self.relations { + let table_name: &str = table_set + .iter() + .next() + .ok_or(QueryPlannerError::InvalidNode)?; + let table: &Table = relations + .get(table_name) + .ok_or(QueryPlannerError::InvalidRelation)?; + match table { + Table::Segment { key, .. } | Table::VirtualSegment { key, .. } => { + let mut new_key: Vec<usize> = Vec::new(); + let all_found = key.iter().all(|pos| { + table_pos_map.get(pos).map_or(false, |v| { + new_key.push(*v); + true + }) + }); + if all_found { + if let Node::Expression(Expression::Row { + ref mut distribution, + .. + }) = self + .nodes + .get_mut(row_node) + .ok_or(QueryPlannerError::InvalidRow)? + { + *distribution = Some(Distribution::Segment { key: new_key }); + } + } + } + Table::Virtual { .. } => { + if let Node::Expression(Expression::Row { + ref mut distribution, + .. + }) = self + .nodes + .get_mut(row_node) + .ok_or(QueryPlannerError::InvalidRow)? + { + *distribution = Some(Distribution::Random); + } + } + } + Ok(()) + } else { + Err(QueryPlannerError::InvalidPlan) + } + } + + fn set_two_children_node_dist( + &mut self, + child_set: &HashSet<usize>, + child_pos_map: &HashMap<(usize, usize), usize>, + parent_node: &Option<usize>, + row_node: usize, + ) -> Result<(), QueryPlannerError> { + let mut child_set_iter = child_set.iter(); + let left_child = *child_set_iter + .next() + .ok_or(QueryPlannerError::InvalidNode)?; + let right_child = *child_set_iter + .next() + .ok_or(QueryPlannerError::InvalidNode)?; + + let is_union_all: bool = matches!( + self.get_node(parent_node.ok_or(QueryPlannerError::InvalidNode)?)?, + Node::Relational(Relational::UnionAll { .. }) + ); + + if is_union_all { + let left_dist = self.dist_from_child(left_child, child_pos_map)?; + let right_dist = self.dist_from_child(right_child, child_pos_map)?; + if let Node::Expression(Expression::Row { + ref mut distribution, + .. + }) = self + .nodes + .get_mut(row_node) + .ok_or(QueryPlannerError::InvalidRow)? + { + *distribution = Some(Distribution::new_union(left_dist, right_dist)?); + } + } else { + // TODO: implement join + return Err(QueryPlannerError::InvalidNode); + } + Ok(()) + } } /// Plan node iterator over its branches. diff --git a/src/ir/distribution.rs b/src/ir/distribution.rs new file mode 100644 index 0000000000000000000000000000000000000000..32b25600b6e68cffeb798f3ae7f7de64c276d6b1 --- /dev/null +++ b/src/ir/distribution.rs @@ -0,0 +1,74 @@ +use crate::errors::QueryPlannerError; +use serde::{Deserialize, Serialize}; + +/// Tuple data chunk distribution policy in the cluster. +#[derive(Serialize, Deserialize, PartialEq, Debug)] +pub enum Distribution { + /// Only coordinator contains all the data. + /// + /// Example: insertion to the virtual table on coordinator. + Coordinator, + /// Each segment contains a random portion of data. + /// + /// Example: "erased" distribution key by projection operator. + Random, + /// Each segment contains a full copy of data. + /// + /// Example: constant expressions. + Replicated, + /// Each segment contains a portion of data, + /// determined by the distribution key rule. + /// + /// Example: segmented table. + Segment { + /// A list of column positions in the output tuple, + /// that for a distribution key. + key: Vec<usize>, + }, +} + +impl Distribution { + /// Calculate a new distribution for the `UnionAll` output tuple. + /// + /// # Errors + /// Returns `QueryPlannerError`: + /// - distribution conflict that should be resolved by adding a `Motion` node + pub fn new_union( + left: Distribution, + right: Distribution, + ) -> Result<Distribution, QueryPlannerError> { + match left { + Distribution::Coordinator => match right { + Distribution::Coordinator => Ok(left), + _ => Err(QueryPlannerError::RequireMotion), + }, + Distribution::Random => match right { + Distribution::Coordinator => Err(QueryPlannerError::RequireMotion), + _ => Ok(left), + }, + Distribution::Replicated => match right { + Distribution::Coordinator => Err(QueryPlannerError::RequireMotion), + _ => Ok(right), + }, + Distribution::Segment { + key: ref left_key, .. + } => match right { + Distribution::Random => Ok(Distribution::Random), + Distribution::Replicated => Ok(left), + Distribution::Segment { + key: ref right_key, .. + } => { + if left_key.iter().zip(right_key.iter()).all(|(l, r)| l == r) { + Ok(left) + } else { + Ok(Distribution::Random) + } + } + Distribution::Coordinator => Err(QueryPlannerError::RequireMotion), + }, + } + } +} + +#[cfg(test)] +mod tests; diff --git a/src/ir/expression/tests.rs b/src/ir/distribution/tests.rs similarity index 88% rename from src/ir/expression/tests.rs rename to src/ir/distribution/tests.rs index d066169ebc300e8be4107c0885f8077bb1245001..c70993751515370064535b3d98d7d9b1e9b95a07 100644 --- a/src/ir/expression/tests.rs +++ b/src/ir/distribution/tests.rs @@ -37,7 +37,7 @@ fn proj_preserve_dist_key() { } else { panic!("Invalid plan!"); }; - set_distribution(scan_output, &map, &mut plan).unwrap(); + plan.set_distribution(scan_output, &map).unwrap(); if let Node::Expression(scan_row) = plan.get_node(scan_output).unwrap() { assert_eq!( &Distribution::Segment { key: vec![1, 0] }, @@ -50,7 +50,7 @@ fn proj_preserve_dist_key() { } else { panic!("Invalid plan!"); }; - set_distribution(proj_output, &map, &mut plan).unwrap(); + plan.set_distribution(proj_output, &map).unwrap(); if let Node::Expression(proj_row) = plan.get_node(proj_output).unwrap() { assert_eq!( &Distribution::Segment { key: vec![1, 0] }, @@ -77,7 +77,7 @@ fn proj_shuffle_dist_key() { let scan_output = 8; let proj_output = 14; - set_distribution(scan_output, &map, &mut plan).unwrap(); + plan.set_distribution(scan_output, &map).unwrap(); if let Node::Expression(scan_row) = plan.get_node(scan_output).unwrap() { assert_eq!( &Distribution::Segment { key: vec![1, 0] }, @@ -85,7 +85,7 @@ fn proj_shuffle_dist_key() { ); } - set_distribution(proj_output, &map, &mut plan).unwrap(); + plan.set_distribution(proj_output, &map).unwrap(); if let Node::Expression(proj_row) = plan.get_node(proj_output).unwrap() { assert_eq!( &Distribution::Segment { key: vec![0, 1] }, @@ -112,7 +112,7 @@ fn proj_shrink_dist_key_1() { let scan_output = 8; let proj_output = 14; - set_distribution(scan_output, &map, &mut plan).unwrap(); + plan.set_distribution(scan_output, &map).unwrap(); if let Node::Expression(scan_row) = plan.get_node(scan_output).unwrap() { assert_eq!( &Distribution::Segment { key: vec![1, 0] }, @@ -120,7 +120,7 @@ fn proj_shrink_dist_key_1() { ); } - set_distribution(proj_output, &map, &mut plan).unwrap(); + plan.set_distribution(proj_output, &map).unwrap(); if let Node::Expression(proj_row) = plan.get_node(proj_output).unwrap() { assert_eq!(&Distribution::Random, proj_row.distribution().unwrap()); } @@ -144,7 +144,7 @@ fn proj_shrink_dist_key_2() { let scan_output = 8; let proj_output = 12; - set_distribution(scan_output, &map, &mut plan).unwrap(); + plan.set_distribution(scan_output, &map).unwrap(); if let Node::Expression(scan_row) = plan.get_node(scan_output).unwrap() { assert_eq!( &Distribution::Segment { key: vec![1, 0] }, @@ -152,7 +152,7 @@ fn proj_shrink_dist_key_2() { ); } - set_distribution(proj_output, &map, &mut plan).unwrap(); + plan.set_distribution(proj_output, &map).unwrap(); if let Node::Expression(proj_row) = plan.get_node(proj_output).unwrap() { assert_eq!(&Distribution::Random, proj_row.distribution().unwrap()); } @@ -178,7 +178,7 @@ fn union_all_fallback_to_random() { let scan_t2_output = 10; let union_output = 16; - set_distribution(scan_t1_output, &map, &mut plan).unwrap(); + plan.set_distribution(scan_t1_output, &map).unwrap(); if let Node::Expression(scan_row) = plan.get_node(scan_t1_output).unwrap() { assert_eq!( &Distribution::Segment { key: vec![0] }, @@ -186,7 +186,7 @@ fn union_all_fallback_to_random() { ); } - set_distribution(scan_t2_output, &map, &mut plan).unwrap(); + plan.set_distribution(scan_t2_output, &map).unwrap(); if let Node::Expression(scan_row) = plan.get_node(scan_t2_output).unwrap() { assert_eq!( &Distribution::Segment { key: vec![1] }, @@ -194,7 +194,7 @@ fn union_all_fallback_to_random() { ); } - set_distribution(union_output, &map, &mut plan).unwrap(); + plan.set_distribution(union_output, &map).unwrap(); if let Node::Expression(scan_row) = plan.get_node(union_output).unwrap() { assert_eq!(&Distribution::Random, scan_row.distribution().unwrap()); } @@ -220,7 +220,7 @@ fn union_preserve_dist() { let scan_t2_output = 10; let union_output = 16; - set_distribution(scan_t1_output, &map, &mut plan).unwrap(); + plan.set_distribution(scan_t1_output, &map).unwrap(); if let Node::Expression(scan_row) = plan.get_node(scan_t1_output).unwrap() { assert_eq!( &Distribution::Segment { key: vec![0] }, @@ -228,7 +228,7 @@ fn union_preserve_dist() { ); } - set_distribution(scan_t2_output, &map, &mut plan).unwrap(); + plan.set_distribution(scan_t2_output, &map).unwrap(); if let Node::Expression(scan_row) = plan.get_node(scan_t2_output).unwrap() { assert_eq!( &Distribution::Segment { key: vec![0] }, @@ -236,7 +236,7 @@ fn union_preserve_dist() { ); } - set_distribution(union_output, &map, &mut plan).unwrap(); + plan.set_distribution(union_output, &map).unwrap(); if let Node::Expression(scan_row) = plan.get_node(union_output).unwrap() { assert_eq!( &Distribution::Segment { key: vec![0] }, diff --git a/src/ir/expression.rs b/src/ir/expression.rs index 2c22ebc4d3b07d6f9c6f6bd8d94c9f36dcba5f80..82556651748cf211f19ef661b60af3197a4ff19f 100644 --- a/src/ir/expression.rs +++ b/src/ir/expression.rs @@ -5,83 +5,11 @@ //! - the order of the columns (and we can get their types as well) //! - distribution of the data in the tuple +use super::distribution::Distribution; use super::operator; -use super::operator::Relational; -use super::relation::Table; use super::value::Value; -use super::{Node, Plan}; use crate::errors::QueryPlannerError; use serde::{Deserialize, Serialize}; -use std::collections::{HashMap, HashSet}; - -/// Tuple data chunk distribution policy in the cluster. -#[derive(Serialize, Deserialize, PartialEq, Debug)] -pub enum Distribution { - /// Only coordinator contains all the data. - /// - /// Example: insertion to the virtual table on coordinator. - Coordinator, - /// Each segment contains a random portion of data. - /// - /// Example: "erased" distribution key by projection operator. - Random, - /// Each segment contains a full copy of data. - /// - /// Example: constant expressions. - Replicated, - /// Each segment contains a portion of data, - /// determined by the distribution key rule. - /// - /// Example: segmented table. - Segment { - /// A list of column positions in the output tuple, - /// that for a distribution key. - key: Vec<usize>, - }, -} - -impl Distribution { - /// Calculate a new distribution for the `UnionAll` output tuple. - /// - /// # Errors - /// Returns `QueryPlannerError`: - /// - distribution conflict that should be resolved by adding a `Motion` node - pub fn new_union( - left: Distribution, - right: Distribution, - ) -> Result<Distribution, QueryPlannerError> { - match left { - Distribution::Coordinator => match right { - Distribution::Coordinator => Ok(left), - _ => Err(QueryPlannerError::RequireMotion), - }, - Distribution::Random => match right { - Distribution::Coordinator => Err(QueryPlannerError::RequireMotion), - _ => Ok(left), - }, - Distribution::Replicated => match right { - Distribution::Coordinator => Err(QueryPlannerError::RequireMotion), - _ => Ok(right), - }, - Distribution::Segment { - key: ref left_key, .. - } => match right { - Distribution::Random => Ok(Distribution::Random), - Distribution::Replicated => Ok(left), - Distribution::Segment { - key: ref right_key, .. - } => { - if left_key.iter().zip(right_key.iter()).all(|(l, r)| l == r) { - Ok(left) - } else { - Ok(Distribution::Random) - } - } - Distribution::Coordinator => Err(QueryPlannerError::RequireMotion), - }, - } - } -} /// Tuple tree build blocks. /// @@ -151,244 +79,6 @@ pub enum Expression { }, } -fn dist_suggested_by_child( - child: usize, - plan: &Plan, - child_pos_map: &HashMap<(usize, usize), usize>, -) -> Result<Distribution, QueryPlannerError> { - if let Node::Relational(relational_op) = plan.get_node(child)? { - if let Node::Expression(Expression::Row { - distribution: child_dist, - .. - }) = plan.get_node(relational_op.output())? - { - match child_dist { - None => return Err(QueryPlannerError::UninitializedDistribution), - Some(Distribution::Random) => return Ok(Distribution::Random), - Some(Distribution::Replicated) => return Ok(Distribution::Replicated), - Some(Distribution::Coordinator) => return Ok(Distribution::Coordinator), - Some(Distribution::Segment { key }) => { - let mut new_key: Vec<usize> = Vec::new(); - let all_found = key.iter().all(|pos| { - child_pos_map.get(&(child, *pos)).map_or(false, |v| { - new_key.push(*v); - true - }) - }); - if all_found { - return Ok(Distribution::Segment { key: new_key }); - } - return Ok(Distribution::Random); - } - } - } - } - Err(QueryPlannerError::InvalidRow) -} - -fn set_scan_tuple_distribution( - plan: &mut Plan, - table_set: &HashSet<String>, - table_pos_map: &HashMap<usize, usize>, - row_node: usize, -) -> Result<(), QueryPlannerError> { - if table_set.len() != 1 { - return Err(QueryPlannerError::InvalidNode); - } - if let Some(relations) = &plan.relations { - let table_name: &str = table_set - .iter() - .next() - .ok_or(QueryPlannerError::InvalidNode)?; - let table: &Table = relations - .get(table_name) - .ok_or(QueryPlannerError::InvalidRelation)?; - match table { - Table::Segment { key, .. } | Table::VirtualSegment { key, .. } => { - let mut new_key: Vec<usize> = Vec::new(); - let all_found = key.iter().all(|pos| { - table_pos_map.get(pos).map_or(false, |v| { - new_key.push(*v); - true - }) - }); - if all_found { - if let Node::Expression(Expression::Row { - ref mut distribution, - .. - }) = plan - .nodes - .get_mut(row_node) - .ok_or(QueryPlannerError::InvalidRow)? - { - *distribution = Some(Distribution::Segment { key: new_key }); - } - } - } - Table::Virtual { .. } => { - if let Node::Expression(Expression::Row { - ref mut distribution, - .. - }) = plan - .nodes - .get_mut(row_node) - .ok_or(QueryPlannerError::InvalidRow)? - { - *distribution = Some(Distribution::Random); - } - } - } - Ok(()) - } else { - Err(QueryPlannerError::InvalidPlan) - } -} - -fn set_double_children_node_tuple_distribution( - plan: &mut Plan, - child_set: &HashSet<usize>, - child_pos_map: &HashMap<(usize, usize), usize>, - parent_node: &Option<usize>, - row_node: usize, -) -> Result<(), QueryPlannerError> { - let mut child_set_iter = child_set.iter(); - let left_child = *child_set_iter - .next() - .ok_or(QueryPlannerError::InvalidNode)?; - let right_child = *child_set_iter - .next() - .ok_or(QueryPlannerError::InvalidNode)?; - - let is_union_all: bool = matches!( - plan.get_node(parent_node.ok_or(QueryPlannerError::InvalidNode)?)?, - Node::Relational(Relational::UnionAll { .. }) - ); - - if is_union_all { - let left_dist = dist_suggested_by_child(left_child, plan, child_pos_map)?; - let right_dist = dist_suggested_by_child(right_child, plan, child_pos_map)?; - if let Node::Expression(Expression::Row { - ref mut distribution, - .. - }) = plan - .nodes - .get_mut(row_node) - .ok_or(QueryPlannerError::InvalidRow)? - { - *distribution = Some(Distribution::new_union(left_dist, right_dist)?); - } - } else { - // TODO: implement join - return Err(QueryPlannerError::InvalidNode); - } - Ok(()) -} - -/// Calculate and set tuple distribution. -/// -/// As the references in the `Row` expression contain only logical ID of the parent relational nodes, -/// we need at first traverse all the plan nodes and build a "logical id - array position" map with -/// `relational_id_map()` function and pass its reference to this function. -/// -/// # Errors -/// Returns `QueryPlannerError` when current expression is not a `Row` or contains broken references. -pub fn set_distribution<S: ::std::hash::BuildHasher + Default>( - row_node: usize, - id_map: &HashMap<usize, usize, S>, - plan: &mut Plan, -) -> Result<(), QueryPlannerError> { - let mut child_set: HashSet<usize> = HashSet::new(); - let mut child_pos_map: HashMap<(usize, usize), usize> = HashMap::new(); - let mut table_set: HashSet<String> = HashSet::new(); - let mut table_pos_map: HashMap<usize, usize> = HashMap::default(); - let mut parent_node: Option<usize> = None; - - if let Node::Expression(Expression::Row { list, .. }) = plan.get_node(row_node)? { - // Gather information about children nodes, that are pointed by the row references. - for (pos, alias_node) in list.iter().enumerate() { - if let Node::Expression(Expression::Alias { child, .. }) = plan.get_node(*alias_node)? { - if let Node::Expression(Expression::Reference { - targets, - position, - parent, - .. - }) = plan.get_node(*child)? - { - // Get the relational node, containing this row - parent_node = Some(*id_map.get(parent).ok_or(QueryPlannerError::InvalidNode)?); - if let Node::Relational(relational_op) = - plan.get_node(parent_node.ok_or(QueryPlannerError::InvalidNode)?)? - { - if let Some(children) = relational_op.children() { - // References in the branch node. - let child_pos_list: &Vec<usize> = targets - .as_ref() - .ok_or(QueryPlannerError::InvalidReference)?; - for target in child_pos_list { - let child_node: usize = *children - .get(*target) - .ok_or(QueryPlannerError::ValueOutOfRange)?; - child_set.insert(child_node); - child_pos_map.insert((child_node, *position), pos); - } - } else { - // References in the leaf (relation scan) node. - if targets.is_some() { - return Err(QueryPlannerError::InvalidReference); - } - if let Relational::ScanRelation { relation, .. } = relational_op { - table_set.insert(relation.clone()); - table_pos_map.insert(*position, pos); - } else { - return Err(QueryPlannerError::InvalidReference); - } - } - } else { - return Err(QueryPlannerError::InvalidNode); - } - } - } - } - } - - match child_set.len() { - 0 => { - // Scan - set_scan_tuple_distribution(plan, &table_set, &table_pos_map, row_node)?; - } - 1 => { - // Single child - let child: usize = *child_set - .iter() - .next() - .ok_or(QueryPlannerError::InvalidNode)?; - let suggested_dist = dist_suggested_by_child(child, plan, &child_pos_map)?; - if let Node::Expression(Expression::Row { - ref mut distribution, - .. - }) = plan - .nodes - .get_mut(row_node) - .ok_or(QueryPlannerError::InvalidRow)? - { - *distribution = Some(suggested_dist); - } - } - 2 => { - // Union, join - set_double_children_node_tuple_distribution( - plan, - &child_set, - &child_pos_map, - &parent_node, - row_node, - )?; - } - _ => return Err(QueryPlannerError::InvalidReference), - } - Ok(()) -} - #[allow(dead_code)] impl Expression { /// Get current row distribution. @@ -445,6 +135,3 @@ impl Expression { Expression::Bool { left, op, right } } } - -#[cfg(test)] -mod tests; diff --git a/src/ir/operator/tests.rs b/src/ir/operator/tests.rs index 71cf3432e1984c1d030fdde742a2730ea824aada..d8431daa9e250cb6de57113002ca5bce1c501e21 100644 --- a/src/ir/operator/tests.rs +++ b/src/ir/operator/tests.rs @@ -1,5 +1,6 @@ use super::*; use crate::errors::QueryPlannerError; +use crate::ir::distribution::*; use crate::ir::expression::*; use crate::ir::relation::*; use crate::ir::value::*; @@ -45,7 +46,7 @@ fn scan_rel() { let map = plan.relational_id_map(); - set_distribution(scan_output, &map, &mut plan).unwrap(); + plan.set_distribution(scan_output, &map).unwrap(); if let Node::Expression(row) = plan.get_node(scan_output).unwrap() { assert_eq!( row.distribution().unwrap(), @@ -80,7 +81,7 @@ fn scan_rel_serialized() { let scan_output = 8; let map = plan.relational_id_map(); - set_distribution(scan_output, &map, &mut plan).unwrap(); + plan.set_distribution(scan_output, &map).unwrap(); let path = Path::new("") .join("tests")