diff --git a/src/errors.rs b/src/errors.rs index 41b2dcfe1079535d068e1dcafc5a225cf075a9ce..8009bb27e7f4cc76ed1b98e71471a756a835f740 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -4,39 +4,47 @@ use serde::Serialize; const BUCKET_ID_ERROR: &str = "field doesn't contains sharding key value"; const DUPLICATE_COLUMN_ERROR: &str = "duplicate column"; +const EMPTY_PLAN_RELATION: &str = "empty plan relations"; +const INCORRECT_BUCKET_ID_ERROR: &str = "incorrect bucket id"; const INVALID_BOOL_ERROR: &str = "invalid boolean"; const INVALID_NAME_ERROR: &str = "invalid name"; +const INVALID_NODE: &str = "invalid node"; const INVALID_NUMBER_ERROR: &str = "invalid number"; const INVALID_PLAN_ERROR: &str = "invalid plan"; const INVALID_RELATION_ERROR: &str = "invalid relation"; const INVALID_ROW_ERROR: &str = "invalid row"; const INVALID_SHARDING_KEY_ERROR: &str = "invalid sharding key"; const NOT_EQUAL_ROWS: &str = "not equal rows"; +const QUERY_NOT_IMPLEMENTED: &str = "query wasn't s implemented"; +const REQUIRE_MOTION: &str = "require motion"; const SERIALIZATION_ERROR: &str = "serialization"; const SIMPLE_QUERY_ERROR: &str = "query doesn't simple"; const SIMPLE_UNION_QUERY_ERROR: &str = "query doesn't simple union"; -const QUERY_NOT_IMPLEMENTED: &str = "query wasn't s implemented"; +const UNINITIALIZED_DISTRIBUTION: &str = "uninitialized distribution"; const VALUE_OUT_OF_RANGE_ERROR: &str = "value out of range"; -const INCORRECT_BUCKET_ID_ERROR: &str = "incorrect bucket id"; #[derive(Debug, Clone, PartialEq, Serialize)] pub enum QueryPlannerError { BucketIdError, DuplicateColumn, + EmptyPlanRelations, + IncorrectBucketIdError, InvalidBool, InvalidName, + InvalidNode, InvalidNumber, InvalidPlan, InvalidRelation, InvalidRow, InvalidShardingKey, NotEqualRows, + QueryNotImplemented, + RequireMotion, Serialization, SimpleQueryError, SimpleUnionQueryError, - QueryNotImplemented, + UninitializedDistribution, ValueOutOfRange, - IncorrectBucketIdError, } impl fmt::Display for QueryPlannerError { @@ -44,20 +52,24 @@ impl fmt::Display for QueryPlannerError { let p = match self { QueryPlannerError::BucketIdError => BUCKET_ID_ERROR, QueryPlannerError::DuplicateColumn => DUPLICATE_COLUMN_ERROR, + QueryPlannerError::EmptyPlanRelations => EMPTY_PLAN_RELATION, + QueryPlannerError::IncorrectBucketIdError => INCORRECT_BUCKET_ID_ERROR, QueryPlannerError::InvalidBool => INVALID_BOOL_ERROR, QueryPlannerError::InvalidName => INVALID_NAME_ERROR, + QueryPlannerError::InvalidNode => INVALID_NODE, QueryPlannerError::InvalidNumber => INVALID_NUMBER_ERROR, QueryPlannerError::InvalidPlan => INVALID_PLAN_ERROR, QueryPlannerError::InvalidRelation => INVALID_RELATION_ERROR, QueryPlannerError::InvalidRow => INVALID_ROW_ERROR, QueryPlannerError::InvalidShardingKey => INVALID_SHARDING_KEY_ERROR, QueryPlannerError::NotEqualRows => NOT_EQUAL_ROWS, + QueryPlannerError::QueryNotImplemented => QUERY_NOT_IMPLEMENTED, + QueryPlannerError::RequireMotion => REQUIRE_MOTION, QueryPlannerError::Serialization => SERIALIZATION_ERROR, QueryPlannerError::SimpleQueryError => SIMPLE_QUERY_ERROR, QueryPlannerError::SimpleUnionQueryError => SIMPLE_UNION_QUERY_ERROR, - QueryPlannerError::QueryNotImplemented => QUERY_NOT_IMPLEMENTED, + QueryPlannerError::UninitializedDistribution => UNINITIALIZED_DISTRIBUTION, QueryPlannerError::ValueOutOfRange => VALUE_OUT_OF_RANGE_ERROR, - QueryPlannerError::IncorrectBucketIdError => INCORRECT_BUCKET_ID_ERROR, }; write!(f, "{}", p) } diff --git a/src/ir.rs b/src/ir.rs index 9946783945d4810d76bd7aa80ed8495953b37468..fda06da32493a5ce10c4681a47ef2f751f7f51d1 100644 --- a/src/ir.rs +++ b/src/ir.rs @@ -8,7 +8,7 @@ pub mod relation; pub mod value; use crate::errors::QueryPlannerError; -use expression::Expression; +use expression::{Branch, Distribution, Expression}; use operator::Relational; use relation::Table; use serde::{Deserialize, Serialize}; @@ -33,6 +33,156 @@ pub enum Node { Relational(Relational), } +/// Suggested distribution by the child relational node. +/// A wrapper for `Expression::suggest_distribution()`; +/// used by relational nodes when they calculate their +/// distribution. +/// +/// # Errors +/// Returns `QueryPlannerError`: +/// - parent node's output is not a valid tuple +/// - child node is not relational +/// - child's output is not a valid tuple +fn child_dist( + output: usize, + child: usize, + branch: &Branch, + plan: &Plan, +) -> Result<Distribution, QueryPlannerError> { + // Get current output tuple column list + let aliases: &Vec<usize> = + if let Node::Expression(Expression::Row { list, .. }) = plan.get_node(output)? { + Ok(list) + } else { + Err(QueryPlannerError::InvalidRow) + }?; + + // Distribution suggested by the child. + if let Node::Relational(child_node) = plan.get_node(child)? { + if let Node::Expression(child_row) = plan.get_node(child_node.output())? { + Ok(child_row.suggest_distribution(branch, aliases, plan)?) + } else { + Err(QueryPlannerError::InvalidRow) + } + } else { + Err(QueryPlannerError::InvalidPlan) + } +} + +/// Set output tuple distribution for the node. +/// +/// # Errors +/// Returns `QueryPlannerError`: +/// - when node position doesn't exist in the plan node arena +/// - for nodes, that don't produce tuples (all expressions except `Row`) +/// - for relational nodes with invalid output or children +pub fn set_distribution(pointer: usize, plan: &mut Plan) -> Result<(), QueryPlannerError> { + match plan.get_node(pointer)? { + Node::Relational(relational) => { + match relational { + Relational::ScanRelation { + relation: table_name, + .. + } => { + if let Some(relations) = &plan.relations { + if let Some(rel) = relations.get(table_name) { + // Update output tuple distribution to the relation's one. + match rel { + Table::Segment { key, .. } | Table::VirtualSegment { key, .. } => { + let rel_tuple = relational.output(); + let node = plan + .nodes + .get_mut(rel_tuple) + .ok_or(QueryPlannerError::ValueOutOfRange)?; + if let Node::Expression(Expression::Row { + ref mut distribution, + .. + }) = node + { + *distribution = + Some(Distribution::Segment { key: key.clone() }); + return Ok(()); + } + return Err(QueryPlannerError::InvalidRow); + } + Table::Virtual { .. } => { + let rel_tuple = relational.output(); + let node = plan + .nodes + .get_mut(rel_tuple) + .ok_or(QueryPlannerError::ValueOutOfRange)?; + if let Node::Expression(Expression::Row { + ref mut distribution, + .. + }) = node + { + *distribution = Some(Distribution::Random); + return Ok(()); + } + return Err(QueryPlannerError::InvalidRow); + } + } + } + return Err(QueryPlannerError::InvalidRelation); + } + Err(QueryPlannerError::EmptyPlanRelations) + } + Relational::Projection { child, output, .. } + | Relational::Selection { child, output, .. } + | Relational::ScanSubQuery { child, output, .. } => { + let dist = child_dist(*output, *child, &Branch::Left, plan)?; + let rel_tuple = relational.output(); + let node = plan + .nodes + .get_mut(rel_tuple) + .ok_or(QueryPlannerError::ValueOutOfRange)?; + + if let Node::Expression(Expression::Row { + ref mut distribution, + .. + }) = node + { + *distribution = Some(dist); + return Ok(()); + } + Err(QueryPlannerError::InvalidNode) + } + Relational::UnionAll { + left, + right, + output, + .. + } => { + let left_dist = child_dist(*output, *left, &Branch::Both, plan)?; + + let right_dist = child_dist(*output, *right, &Branch::Both, plan)?; + + let rel_tuple = relational.output(); + let node = plan + .nodes + .get_mut(rel_tuple) + .ok_or(QueryPlannerError::ValueOutOfRange)?; + if let Node::Expression(Expression::Row { + ref mut distribution, + .. + }) = node + { + *distribution = Some(Distribution::new_union(left_dist, right_dist)?); + return Ok(()); + } + Err(QueryPlannerError::InvalidRow) + } + // TODO: implement it! + Relational::InnerJoin { .. } | Relational::Motion { .. } => { + Err(QueryPlannerError::QueryNotImplemented) + } + } + } + // TODO: how should we implement it for the `Row`? + Node::Expression(_) => Err(QueryPlannerError::QueryNotImplemented), + } +} + /// Plan node "allocator". /// /// Inserts an element to the array and returns its position, diff --git a/src/ir/expression.rs b/src/ir/expression.rs index 7370ac5d2ff3dd5354daf2d04205f6b234a090ba..1cf579cb71543eeb7dc5c9af850d1a10ab5d8635 100644 --- a/src/ir/expression.rs +++ b/src/ir/expression.rs @@ -15,6 +15,10 @@ use std::collections::HashMap; /// Tuple data chunk distribution policy in the cluster. #[derive(Serialize, Deserialize, PartialEq, Debug)] pub enum Distribution { + /// Only coordinator contains all the data. + /// + /// Example: insertion to the virtual table on coordinator. + Coordinator, /// Each segment contains a random portion of data. /// /// Example: "erased" distribution key by projection operator. @@ -32,10 +36,49 @@ pub enum Distribution { /// that for a distribution key. key: Vec<usize>, }, - /// Only a single segment contains all the data. +} + +impl Distribution { + /// Calculate a new distribution for the `UnionAll` output tuple. /// - /// Example: insertion to the virtual table on coordinator. - Single, + /// # Errors + /// Returns `QueryPlannerError`: + /// - distribution conflict that should be resolved by adding a `Motion` node + pub fn new_union( + left: Distribution, + right: Distribution, + ) -> Result<Distribution, QueryPlannerError> { + match left { + Distribution::Coordinator => match right { + Distribution::Coordinator => Ok(left), + _ => Err(QueryPlannerError::RequireMotion), + }, + Distribution::Random => match right { + Distribution::Coordinator => Err(QueryPlannerError::RequireMotion), + _ => Ok(left), + }, + Distribution::Replicated => match right { + Distribution::Coordinator => Err(QueryPlannerError::RequireMotion), + _ => Ok(right), + }, + Distribution::Segment { + key: ref left_key, .. + } => match right { + Distribution::Random => Ok(Distribution::Random), + Distribution::Replicated => Ok(left), + Distribution::Segment { + key: ref right_key, .. + } => { + if left_key.iter().zip(right_key.iter()).all(|(l, r)| l == r) { + Ok(left) + } else { + Ok(Distribution::Random) + } + } + Distribution::Coordinator => Err(QueryPlannerError::RequireMotion), + }, + } + } } /// Tree branch. @@ -133,8 +176,9 @@ pub enum Expression { Row { /// A list of the alias expression node indexes in the plan node arena. list: Vec<usize>, - /// Resulting data distribution of the tuple. - distribution: Distribution, + /// Resulting data distribution of the tuple. Should be filled as a part + /// of the last "add Motion" transformation. + distribution: Option<Distribution>, }, } @@ -147,8 +191,9 @@ impl Expression { /// This function is executed on the child's side. /// /// # Errors - /// Returns `QueryPlannerError` when aliases are invalid. - pub fn suggested_distribution( + /// Returns `QueryPlannerError` when aliases are invalid or a node doesn't know its + /// distribution yet. + pub fn suggest_distribution( &self, my_branch: &Branch, aliases: &[usize], @@ -158,7 +203,12 @@ impl Expression { ref distribution, .. } = self { - match *distribution { + let dist = match distribution { + Some(d) => &*d, + None => return Err(QueryPlannerError::UninitializedDistribution), + }; + + match dist { Distribution::Random => return Ok(Distribution::Random), Distribution::Replicated => return Ok(Distribution::Replicated), Distribution::Segment { ref key } => { @@ -206,7 +256,7 @@ impl Expression { } return Ok(Distribution::Random); } - Distribution::Single => return Ok(Distribution::Single), + Distribution::Coordinator => return Ok(Distribution::Coordinator), } } Err(QueryPlannerError::InvalidRow) @@ -215,11 +265,15 @@ impl Expression { /// Get current row distribution. /// /// # Errors - /// Returns `QueryPlannerError` when the function is called on - /// expression other than `Row`. + /// Returns `QueryPlannerError` when the function is called on expression + /// other than `Row` or a node doesn't know its distribution yet. pub fn distribution(&self) -> Result<&Distribution, QueryPlannerError> { if let Expression::Row { distribution, .. } = self { - return Ok(distribution); + let dist = match distribution { + Some(d) => d, + None => return Err(QueryPlannerError::UninitializedDistribution), + }; + return Ok(dist); } Err(QueryPlannerError::InvalidRow) } @@ -248,7 +302,7 @@ impl Expression { // TODO: check that doesn't contain top-level aliases with the same names /// Row expression constructor. #[must_use] - pub fn new_row(list: Vec<usize>, distribution: Distribution) -> Self { + pub fn new_row(list: Vec<usize>, distribution: Option<Distribution>) -> Self { Expression::Row { list, distribution } } diff --git a/src/ir/expression/tests.rs b/src/ir/expression/tests.rs index cb19c7478346d816417d580d16b4e7a468467679..1057cdca828698b5d4821751a73cbe777f7b338d 100644 --- a/src/ir/expression/tests.rs +++ b/src/ir/expression/tests.rs @@ -5,7 +5,7 @@ use std::fs; use std::path::Path; #[test] -fn suggested_distribution() { +fn suggest_distribution() { // Load a table "t (a, b, c, d)" distributed by ["b", "a"] // with a sec scan and three additional alias-reference pairs // for "a", "b" and "c". We want to see, what suggestions would @@ -16,7 +16,7 @@ fn suggested_distribution() { .join("artifactory") .join("ir") .join("expression") - .join("suggested_distribution.yaml"); + .join("suggest_distribution.yaml"); let s = fs::read_to_string(path).unwrap(); let plan = Plan::from_yaml(&s).unwrap(); @@ -30,7 +30,7 @@ fn suggested_distribution() { assert_eq!( Distribution::Segment { key: vec![1, 0] }, output - .suggested_distribution(&Branch::Left, &[a, b, c], &plan) + .suggest_distribution(&Branch::Left, &[a, b, c], &plan) .unwrap() ); @@ -38,7 +38,7 @@ fn suggested_distribution() { assert_eq!( Distribution::Segment { key: vec![0, 1] }, output - .suggested_distribution(&Branch::Left, &[b, a], &plan) + .suggest_distribution(&Branch::Left, &[b, a], &plan) .unwrap() ); @@ -46,7 +46,7 @@ fn suggested_distribution() { assert_eq!( Distribution::Random, output - .suggested_distribution(&Branch::Left, &[c, a], &plan) + .suggest_distribution(&Branch::Left, &[c, a], &plan) .unwrap() ); @@ -54,7 +54,7 @@ fn suggested_distribution() { assert_eq!( Distribution::Random, output - .suggested_distribution(&Branch::Left, &[a], &plan) + .suggest_distribution(&Branch::Left, &[a], &plan) .unwrap() ); @@ -62,7 +62,7 @@ fn suggested_distribution() { assert_eq!( Distribution::Segment { key: vec![1, 0] }, output - .suggested_distribution(&Branch::Both, &[a, b, c], &plan) + .suggest_distribution(&Branch::Both, &[a, b, c], &plan) .unwrap() ); @@ -70,7 +70,7 @@ fn suggested_distribution() { assert_eq!( Distribution::Random, output - .suggested_distribution(&Branch::Right, &[a, b, c], &plan) + .suggest_distribution(&Branch::Right, &[a, b, c], &plan) .unwrap() ); diff --git a/src/ir/operator.rs b/src/ir/operator.rs index 2bb4e66a999adb36581139ab6640ed7d5400299e..8b6dd8551573eb1a9984ec247930f14d5bf9fc83 100644 --- a/src/ir/operator.rs +++ b/src/ir/operator.rs @@ -1,6 +1,6 @@ //! Operators for expression transformations. -use super::expression::{Branch, Distribution, Expression}; +use super::expression::{Branch, Expression}; use super::relation::Table; use super::{vec_alloc, Node, Plan}; use crate::errors::QueryPlannerError; @@ -226,11 +226,7 @@ impl Relational { if let Some(relations) = &plan.relations { if let Some(rel) = relations.get(table_name) { match rel { - Table::Segment { - ref columns, - key, - name: _, - } => { + Table::Segment { ref columns, .. } => { let refs = columns .iter() .enumerate() @@ -247,10 +243,7 @@ impl Relational { return Ok(Relational::ScanRelation { output: vec_alloc( nodes, - Node::Expression(Expression::new_row( - refs, - Distribution::Segment { key: key.clone() }, - )), + Node::Expression(Expression::new_row(refs, None)), ), relation: String::from(table_name), }); @@ -266,7 +259,10 @@ impl Relational { /// New `Projection` constructor. /// /// # Errors - /// Returns `QueryPlannerError` when the child node is invalid. + /// Returns `QueryPlannerError`: + /// - child node is not relational + /// - child output tuple is invalid + /// - column name do not match the ones in the child output tuple pub fn new_proj( plan: &mut Plan, child: usize, @@ -274,32 +270,29 @@ impl Relational { ) -> Result<Self, QueryPlannerError> { let aliases = new_alias_nodes(plan, child, output, &Branch::Left)?; - if let Node::Relational(child_node) = plan.get_node(child)? { - if let Node::Expression(child_row) = plan.get_node(child_node.output())? { - let dist = child_row.suggested_distribution(&Branch::Left, &aliases, plan)?; - let new_output = vec_alloc( - &mut plan.nodes, - Node::Expression(Expression::new_row(aliases, dist)), - ); - return Ok(Relational::Projection { - child, - output: new_output, - }); - } - } - Err(QueryPlannerError::InvalidPlan) + let new_output = vec_alloc( + &mut plan.nodes, + Node::Expression(Expression::new_row(aliases, None)), + ); + + Ok(Relational::Projection { + child, + output: new_output, + }) } /// New `Selection` constructor /// /// # Errors - /// Returns `QueryPlannerError` when the child or filter nodes are invalid. + /// Returns `QueryPlannerError`: + /// - filter expression is not boolean + /// - child node is not relational + /// - child output tuple is not valid pub fn new_select( plan: &mut Plan, child: usize, filter: usize, ) -> Result<Self, QueryPlannerError> { - // Check that filter node is a boolean expression. if let Node::Expression(Expression::Bool { .. }) = plan.get_node(filter)? { } else { return Err(QueryPlannerError::InvalidBool); @@ -310,32 +303,28 @@ impl Relational { } else { return Err(QueryPlannerError::InvalidRow); }; - let output: Vec<&str> = names.iter().map(|s| s as &str).collect(); let aliases = new_alias_nodes(plan, child, &output, &Branch::Left)?; - if let Node::Relational(child_node) = plan.get_node(child)? { - if let Node::Expression(child_row) = plan.get_node(child_node.output())? { - let dist = child_row.suggested_distribution(&Branch::Left, &aliases, plan)?; - let new_output = vec_alloc( - &mut plan.nodes, - Node::Expression(Expression::new_row(aliases, dist)), - ); - return Ok(Relational::Selection { - child, - filter, - output: new_output, - }); - } - } - Err(QueryPlannerError::InvalidPlan) + let new_output = vec_alloc( + &mut plan.nodes, + Node::Expression(Expression::new_row(aliases, None)), + ); + + Ok(Relational::Selection { + child, + filter, + output: new_output, + }) } /// New `UnionAll` constructor. /// /// # Errors - /// Returns `QueryPlannerError` when the left or right nodes are invalid - /// or have a different column structure in the output tuples. + /// Returns `QueryPlannerError`: + /// - children nodes are not relational + /// - children tuples are invalid + /// - children tuples have mismatching structure pub fn new_union_all( plan: &mut Plan, left: usize, @@ -364,37 +353,11 @@ impl Relational { let col_names: Vec<&str> = left_names.iter().map(|s| s as &str).collect(); let aliases = new_alias_nodes(plan, left, &col_names, &Branch::Both)?; - let left_dist = if let Node::Relational(left_node) = plan.get_node(left)? { - match plan.get_node(left_node.output())? { - Node::Expression(left_row) => { - left_row.suggested_distribution(&Branch::Left, &aliases, plan)? - } - Node::Relational(_) => return Err(QueryPlannerError::InvalidRow), - } - } else { - return Err(QueryPlannerError::InvalidPlan); - }; - - let right_dist = if let Node::Relational(right_node) = plan.get_node(right)? { - match plan.get_node(right_node.output())? { - Node::Expression(right_row) => { - right_row.suggested_distribution(&Branch::Right, &aliases, plan)? - } - Node::Relational(_) => return Err(QueryPlannerError::InvalidRow), - } - } else { - return Err(QueryPlannerError::InvalidPlan); - }; - - let dist = if left_dist == right_dist { - left_dist - } else { - Distribution::Random - }; let output = vec_alloc( &mut plan.nodes, - Node::Expression(Expression::new_row(aliases, dist)), + Node::Expression(Expression::new_row(aliases, None)), ); + Ok(Relational::UnionAll { left, right, @@ -405,7 +368,10 @@ impl Relational { /// New `ScanSubQuery` constructor. /// /// # Errors - /// Returns `QueryPlannerError` when the child node is invalid. + /// Returns `QueryPlannerError`: + /// - child node is not relational + /// - child node output is not a correct tuple + /// - `SubQuery` name is empty pub fn new_sub_query( plan: &mut Plan, child: usize, @@ -416,7 +382,6 @@ impl Relational { } else { return Err(QueryPlannerError::InvalidRow); }; - if alias.is_empty() { return Err(QueryPlannerError::InvalidName); } @@ -424,20 +389,9 @@ impl Relational { let col_names: Vec<&str> = names.iter().map(|s| s as &str).collect(); let aliases = new_alias_nodes(plan, child, &col_names, &Branch::Both)?; - let dist = if let Node::Relational(left_node) = plan.get_node(child)? { - match plan.get_node(left_node.output())? { - Node::Expression(left_row) => { - left_row.suggested_distribution(&Branch::Left, &aliases, plan)? - } - Node::Relational(_) => return Err(QueryPlannerError::InvalidRow), - } - } else { - return Err(QueryPlannerError::InvalidPlan); - }; - let output = vec_alloc( &mut plan.nodes, - Node::Expression(Expression::new_row(aliases, dist)), + Node::Expression(Expression::new_row(aliases, None)), ); Ok(Relational::ScanSubQuery { diff --git a/src/ir/operator/tests.rs b/src/ir/operator/tests.rs index b501170b402cf1ce11927da0a4a54c988f8cf9b0..f94ca8d28c718d4e0924dc0d710b95fe38b974d0 100644 --- a/src/ir/operator/tests.rs +++ b/src/ir/operator/tests.rs @@ -3,6 +3,7 @@ use crate::errors::QueryPlannerError; use crate::ir::expression::*; use crate::ir::relation::*; use crate::ir::value::*; +use crate::ir::*; use itertools::Itertools; use pretty_assertions::assert_eq; use std::fs; @@ -33,7 +34,9 @@ fn scan_rel() { }, scan ); + assert_eq!(9, vec_alloc(&mut plan.nodes, Node::Relational(scan))); + set_distribution(9, &mut plan).unwrap(); if let Node::Expression(row) = plan.get_node(8).unwrap() { assert_eq!( *row.distribution().unwrap(), @@ -42,8 +45,6 @@ fn scan_rel() { } else { panic!("Wrong output node type!"); } - - assert_eq!(9, vec_alloc(&mut plan.nodes, Node::Relational(scan))); } #[test] @@ -66,6 +67,7 @@ fn scan_rel_serialized() { let scan = Relational::new_scan("t", &mut plan).unwrap(); plan.nodes.push(Node::Relational(scan)); plan.top = Some(9); + set_distribution(plan.top.unwrap(), &mut plan).unwrap(); let path = Path::new("") .join("tests") @@ -96,6 +98,8 @@ fn projection() { let scan = Relational::new_scan("t", &mut plan).unwrap(); let scan_id = vec_alloc(&mut plan.nodes, Node::Relational(scan)); + set_distribution(scan_id, &mut plan).unwrap(); + let proj_seg = Relational::new_proj(&mut plan, scan_id, &["b", "a"]).unwrap(); assert_eq!( Relational::Projection { @@ -104,6 +108,8 @@ fn projection() { }, proj_seg ); + let proj_seg_id = vec_alloc(&mut plan.nodes, Node::Relational(proj_seg)); + set_distribution(proj_seg_id, &mut plan).unwrap(); if let Node::Expression(row) = plan.get_node(14).unwrap() { assert_eq!( @@ -116,12 +122,14 @@ fn projection() { assert_eq!( Relational::Projection { child: scan_id, - output: 19 + output: 20 }, proj_rand ); + let proj_rand_id = vec_alloc(&mut plan.nodes, Node::Relational(proj_rand)); + set_distribution(proj_rand_id, &mut plan).unwrap(); - if let Node::Expression(row) = plan.get_node(19).unwrap() { + if let Node::Expression(row) = plan.get_node(20).unwrap() { assert_eq!(*row.distribution().unwrap(), Distribution::Random); } @@ -238,6 +246,7 @@ fn union_all() { let scan_t1 = Relational::new_scan("t1", &mut plan).unwrap(); let scan_t1_id = vec_alloc(&mut plan.nodes, Node::Relational(scan_t1)); + set_distribution(scan_t1_id, &mut plan).unwrap(); // Check fallback to random distribution let t2 = Table::new_seg( @@ -253,26 +262,42 @@ fn union_all() { let scan_t2 = Relational::new_scan("t2", &mut plan).unwrap(); let scan_t2_id = vec_alloc(&mut plan.nodes, Node::Relational(scan_t2)); + set_distribution(scan_t2_id, &mut plan).unwrap(); let union_all = Relational::new_union_all(&mut plan, scan_t1_id, scan_t2_id).unwrap(); - if let Node::Expression(row) = plan.get_node(union_all.output()).unwrap() { - assert_eq!(Distribution::Random, *row.distribution().unwrap()); + let union_all_id = vec_alloc(&mut plan.nodes, Node::Relational(union_all)); + set_distribution(union_all_id, &mut plan).unwrap(); + + if let Node::Relational(union_all) = plan.get_node(union_all_id).unwrap() { + if let Node::Expression(row) = plan.get_node(union_all.output()).unwrap() { + assert_eq!(Distribution::Random, *row.distribution().unwrap()); + } else { + panic!("Invalid output!"); + } } else { - panic!("Invalid output!"); + panic!("Invalid node!"); } // Check preserving the original distribution let scan_t3 = Relational::new_scan("t1", &mut plan).unwrap(); let scan_t3_id = vec_alloc(&mut plan.nodes, Node::Relational(scan_t3)); + set_distribution(scan_t3_id, &mut plan).unwrap(); let union_all = Relational::new_union_all(&mut plan, scan_t1_id, scan_t3_id).unwrap(); - if let Node::Expression(row) = plan.get_node(union_all.output()).unwrap() { - assert_eq!( - Distribution::Segment { key: vec![0] }, - *row.distribution().unwrap() - ); + let union_all_id = vec_alloc(&mut plan.nodes, Node::Relational(union_all)); + set_distribution(union_all_id, &mut plan).unwrap(); + + if let Node::Relational(union_all) = plan.get_node(union_all_id).unwrap() { + if let Node::Expression(row) = plan.get_node(union_all.output()).unwrap() { + assert_eq!( + Distribution::Segment { key: vec![0] }, + *row.distribution().unwrap() + ); + } else { + panic!("Invalid output!"); + } } else { - panic!("Invalid output!"); + panic!("Invalid node!"); } // Check errors for children with different column names diff --git a/tests/artifactory/ir/expression/suggested_distribution.yaml b/tests/artifactory/ir/expression/suggest_distribution.yaml similarity index 100% rename from tests/artifactory/ir/expression/suggested_distribution.yaml rename to tests/artifactory/ir/expression/suggest_distribution.yaml