From 9c3fe6053f23cc00f6e0a7573b62a638b3e12e80 Mon Sep 17 00:00:00 2001 From: Denis Smirnov <sd@picodata.io> Date: Thu, 9 Dec 2021 17:51:35 +0700 Subject: [PATCH] feat: calculate tuple distribution in a separate function Previously distribution was calculated in the node constructor. It is a wrong design as the tree would be transformed multiple times and there is no reason to recalculate distribution on each step. We need distribution only on the last transformation when we insert Motion nodes. So, this refactoring makes tuple distribution optional and moves distribution logic to a separate function 'set_distribution()'. --- src/errors.rs | 24 ++- src/ir.rs | 152 +++++++++++++++++- src/ir/expression.rs | 80 +++++++-- src/ir/expression/tests.rs | 16 +- src/ir/operator.rs | 128 +++++---------- src/ir/operator/tests.rs | 51 ++++-- ...ibution.yaml => suggest_distribution.yaml} | 0 7 files changed, 323 insertions(+), 128 deletions(-) rename tests/artifactory/ir/expression/{suggested_distribution.yaml => suggest_distribution.yaml} (100%) diff --git a/src/errors.rs b/src/errors.rs index 41b2dcfe10..8009bb27e7 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -4,39 +4,47 @@ use serde::Serialize; const BUCKET_ID_ERROR: &str = "field doesn't contains sharding key value"; const DUPLICATE_COLUMN_ERROR: &str = "duplicate column"; +const EMPTY_PLAN_RELATION: &str = "empty plan relations"; +const INCORRECT_BUCKET_ID_ERROR: &str = "incorrect bucket id"; const INVALID_BOOL_ERROR: &str = "invalid boolean"; const INVALID_NAME_ERROR: &str = "invalid name"; +const INVALID_NODE: &str = "invalid node"; const INVALID_NUMBER_ERROR: &str = "invalid number"; const INVALID_PLAN_ERROR: &str = "invalid plan"; const INVALID_RELATION_ERROR: &str = "invalid relation"; const INVALID_ROW_ERROR: &str = "invalid row"; const INVALID_SHARDING_KEY_ERROR: &str = "invalid sharding key"; const NOT_EQUAL_ROWS: &str = "not equal rows"; +const QUERY_NOT_IMPLEMENTED: &str = "query wasn't s implemented"; +const REQUIRE_MOTION: &str = "require motion"; const SERIALIZATION_ERROR: &str = "serialization"; const SIMPLE_QUERY_ERROR: &str = "query doesn't simple"; const SIMPLE_UNION_QUERY_ERROR: &str = "query doesn't simple union"; -const QUERY_NOT_IMPLEMENTED: &str = "query wasn't s implemented"; +const UNINITIALIZED_DISTRIBUTION: &str = "uninitialized distribution"; const VALUE_OUT_OF_RANGE_ERROR: &str = "value out of range"; -const INCORRECT_BUCKET_ID_ERROR: &str = "incorrect bucket id"; #[derive(Debug, Clone, PartialEq, Serialize)] pub enum QueryPlannerError { BucketIdError, DuplicateColumn, + EmptyPlanRelations, + IncorrectBucketIdError, InvalidBool, InvalidName, + InvalidNode, InvalidNumber, InvalidPlan, InvalidRelation, InvalidRow, InvalidShardingKey, NotEqualRows, + QueryNotImplemented, + RequireMotion, Serialization, SimpleQueryError, SimpleUnionQueryError, - QueryNotImplemented, + UninitializedDistribution, ValueOutOfRange, - IncorrectBucketIdError, } impl fmt::Display for QueryPlannerError { @@ -44,20 +52,24 @@ impl fmt::Display for QueryPlannerError { let p = match self { QueryPlannerError::BucketIdError => BUCKET_ID_ERROR, QueryPlannerError::DuplicateColumn => DUPLICATE_COLUMN_ERROR, + QueryPlannerError::EmptyPlanRelations => EMPTY_PLAN_RELATION, + QueryPlannerError::IncorrectBucketIdError => INCORRECT_BUCKET_ID_ERROR, QueryPlannerError::InvalidBool => INVALID_BOOL_ERROR, QueryPlannerError::InvalidName => INVALID_NAME_ERROR, + QueryPlannerError::InvalidNode => INVALID_NODE, QueryPlannerError::InvalidNumber => INVALID_NUMBER_ERROR, QueryPlannerError::InvalidPlan => INVALID_PLAN_ERROR, QueryPlannerError::InvalidRelation => INVALID_RELATION_ERROR, QueryPlannerError::InvalidRow => INVALID_ROW_ERROR, QueryPlannerError::InvalidShardingKey => INVALID_SHARDING_KEY_ERROR, QueryPlannerError::NotEqualRows => NOT_EQUAL_ROWS, + QueryPlannerError::QueryNotImplemented => QUERY_NOT_IMPLEMENTED, + QueryPlannerError::RequireMotion => REQUIRE_MOTION, QueryPlannerError::Serialization => SERIALIZATION_ERROR, QueryPlannerError::SimpleQueryError => SIMPLE_QUERY_ERROR, QueryPlannerError::SimpleUnionQueryError => SIMPLE_UNION_QUERY_ERROR, - QueryPlannerError::QueryNotImplemented => QUERY_NOT_IMPLEMENTED, + QueryPlannerError::UninitializedDistribution => UNINITIALIZED_DISTRIBUTION, QueryPlannerError::ValueOutOfRange => VALUE_OUT_OF_RANGE_ERROR, - QueryPlannerError::IncorrectBucketIdError => INCORRECT_BUCKET_ID_ERROR, }; write!(f, "{}", p) } diff --git a/src/ir.rs b/src/ir.rs index 9946783945..fda06da324 100644 --- a/src/ir.rs +++ b/src/ir.rs @@ -8,7 +8,7 @@ pub mod relation; pub mod value; use crate::errors::QueryPlannerError; -use expression::Expression; +use expression::{Branch, Distribution, Expression}; use operator::Relational; use relation::Table; use serde::{Deserialize, Serialize}; @@ -33,6 +33,156 @@ pub enum Node { Relational(Relational), } +/// Suggested distribution by the child relational node. +/// A wrapper for `Expression::suggest_distribution()`; +/// used by relational nodes when they calculate their +/// distribution. +/// +/// # Errors +/// Returns `QueryPlannerError`: +/// - parent node's output is not a valid tuple +/// - child node is not relational +/// - child's output is not a valid tuple +fn child_dist( + output: usize, + child: usize, + branch: &Branch, + plan: &Plan, +) -> Result<Distribution, QueryPlannerError> { + // Get current output tuple column list + let aliases: &Vec<usize> = + if let Node::Expression(Expression::Row { list, .. }) = plan.get_node(output)? { + Ok(list) + } else { + Err(QueryPlannerError::InvalidRow) + }?; + + // Distribution suggested by the child. + if let Node::Relational(child_node) = plan.get_node(child)? { + if let Node::Expression(child_row) = plan.get_node(child_node.output())? { + Ok(child_row.suggest_distribution(branch, aliases, plan)?) + } else { + Err(QueryPlannerError::InvalidRow) + } + } else { + Err(QueryPlannerError::InvalidPlan) + } +} + +/// Set output tuple distribution for the node. +/// +/// # Errors +/// Returns `QueryPlannerError`: +/// - when node position doesn't exist in the plan node arena +/// - for nodes, that don't produce tuples (all expressions except `Row`) +/// - for relational nodes with invalid output or children +pub fn set_distribution(pointer: usize, plan: &mut Plan) -> Result<(), QueryPlannerError> { + match plan.get_node(pointer)? { + Node::Relational(relational) => { + match relational { + Relational::ScanRelation { + relation: table_name, + .. + } => { + if let Some(relations) = &plan.relations { + if let Some(rel) = relations.get(table_name) { + // Update output tuple distribution to the relation's one. + match rel { + Table::Segment { key, .. } | Table::VirtualSegment { key, .. } => { + let rel_tuple = relational.output(); + let node = plan + .nodes + .get_mut(rel_tuple) + .ok_or(QueryPlannerError::ValueOutOfRange)?; + if let Node::Expression(Expression::Row { + ref mut distribution, + .. + }) = node + { + *distribution = + Some(Distribution::Segment { key: key.clone() }); + return Ok(()); + } + return Err(QueryPlannerError::InvalidRow); + } + Table::Virtual { .. } => { + let rel_tuple = relational.output(); + let node = plan + .nodes + .get_mut(rel_tuple) + .ok_or(QueryPlannerError::ValueOutOfRange)?; + if let Node::Expression(Expression::Row { + ref mut distribution, + .. + }) = node + { + *distribution = Some(Distribution::Random); + return Ok(()); + } + return Err(QueryPlannerError::InvalidRow); + } + } + } + return Err(QueryPlannerError::InvalidRelation); + } + Err(QueryPlannerError::EmptyPlanRelations) + } + Relational::Projection { child, output, .. } + | Relational::Selection { child, output, .. } + | Relational::ScanSubQuery { child, output, .. } => { + let dist = child_dist(*output, *child, &Branch::Left, plan)?; + let rel_tuple = relational.output(); + let node = plan + .nodes + .get_mut(rel_tuple) + .ok_or(QueryPlannerError::ValueOutOfRange)?; + + if let Node::Expression(Expression::Row { + ref mut distribution, + .. + }) = node + { + *distribution = Some(dist); + return Ok(()); + } + Err(QueryPlannerError::InvalidNode) + } + Relational::UnionAll { + left, + right, + output, + .. + } => { + let left_dist = child_dist(*output, *left, &Branch::Both, plan)?; + + let right_dist = child_dist(*output, *right, &Branch::Both, plan)?; + + let rel_tuple = relational.output(); + let node = plan + .nodes + .get_mut(rel_tuple) + .ok_or(QueryPlannerError::ValueOutOfRange)?; + if let Node::Expression(Expression::Row { + ref mut distribution, + .. + }) = node + { + *distribution = Some(Distribution::new_union(left_dist, right_dist)?); + return Ok(()); + } + Err(QueryPlannerError::InvalidRow) + } + // TODO: implement it! + Relational::InnerJoin { .. } | Relational::Motion { .. } => { + Err(QueryPlannerError::QueryNotImplemented) + } + } + } + // TODO: how should we implement it for the `Row`? + Node::Expression(_) => Err(QueryPlannerError::QueryNotImplemented), + } +} + /// Plan node "allocator". /// /// Inserts an element to the array and returns its position, diff --git a/src/ir/expression.rs b/src/ir/expression.rs index 7370ac5d2f..1cf579cb71 100644 --- a/src/ir/expression.rs +++ b/src/ir/expression.rs @@ -15,6 +15,10 @@ use std::collections::HashMap; /// Tuple data chunk distribution policy in the cluster. #[derive(Serialize, Deserialize, PartialEq, Debug)] pub enum Distribution { + /// Only coordinator contains all the data. + /// + /// Example: insertion to the virtual table on coordinator. + Coordinator, /// Each segment contains a random portion of data. /// /// Example: "erased" distribution key by projection operator. @@ -32,10 +36,49 @@ pub enum Distribution { /// that for a distribution key. key: Vec<usize>, }, - /// Only a single segment contains all the data. +} + +impl Distribution { + /// Calculate a new distribution for the `UnionAll` output tuple. /// - /// Example: insertion to the virtual table on coordinator. - Single, + /// # Errors + /// Returns `QueryPlannerError`: + /// - distribution conflict that should be resolved by adding a `Motion` node + pub fn new_union( + left: Distribution, + right: Distribution, + ) -> Result<Distribution, QueryPlannerError> { + match left { + Distribution::Coordinator => match right { + Distribution::Coordinator => Ok(left), + _ => Err(QueryPlannerError::RequireMotion), + }, + Distribution::Random => match right { + Distribution::Coordinator => Err(QueryPlannerError::RequireMotion), + _ => Ok(left), + }, + Distribution::Replicated => match right { + Distribution::Coordinator => Err(QueryPlannerError::RequireMotion), + _ => Ok(right), + }, + Distribution::Segment { + key: ref left_key, .. + } => match right { + Distribution::Random => Ok(Distribution::Random), + Distribution::Replicated => Ok(left), + Distribution::Segment { + key: ref right_key, .. + } => { + if left_key.iter().zip(right_key.iter()).all(|(l, r)| l == r) { + Ok(left) + } else { + Ok(Distribution::Random) + } + } + Distribution::Coordinator => Err(QueryPlannerError::RequireMotion), + }, + } + } } /// Tree branch. @@ -133,8 +176,9 @@ pub enum Expression { Row { /// A list of the alias expression node indexes in the plan node arena. list: Vec<usize>, - /// Resulting data distribution of the tuple. - distribution: Distribution, + /// Resulting data distribution of the tuple. Should be filled as a part + /// of the last "add Motion" transformation. + distribution: Option<Distribution>, }, } @@ -147,8 +191,9 @@ impl Expression { /// This function is executed on the child's side. /// /// # Errors - /// Returns `QueryPlannerError` when aliases are invalid. - pub fn suggested_distribution( + /// Returns `QueryPlannerError` when aliases are invalid or a node doesn't know its + /// distribution yet. + pub fn suggest_distribution( &self, my_branch: &Branch, aliases: &[usize], @@ -158,7 +203,12 @@ impl Expression { ref distribution, .. } = self { - match *distribution { + let dist = match distribution { + Some(d) => &*d, + None => return Err(QueryPlannerError::UninitializedDistribution), + }; + + match dist { Distribution::Random => return Ok(Distribution::Random), Distribution::Replicated => return Ok(Distribution::Replicated), Distribution::Segment { ref key } => { @@ -206,7 +256,7 @@ impl Expression { } return Ok(Distribution::Random); } - Distribution::Single => return Ok(Distribution::Single), + Distribution::Coordinator => return Ok(Distribution::Coordinator), } } Err(QueryPlannerError::InvalidRow) @@ -215,11 +265,15 @@ impl Expression { /// Get current row distribution. /// /// # Errors - /// Returns `QueryPlannerError` when the function is called on - /// expression other than `Row`. + /// Returns `QueryPlannerError` when the function is called on expression + /// other than `Row` or a node doesn't know its distribution yet. pub fn distribution(&self) -> Result<&Distribution, QueryPlannerError> { if let Expression::Row { distribution, .. } = self { - return Ok(distribution); + let dist = match distribution { + Some(d) => d, + None => return Err(QueryPlannerError::UninitializedDistribution), + }; + return Ok(dist); } Err(QueryPlannerError::InvalidRow) } @@ -248,7 +302,7 @@ impl Expression { // TODO: check that doesn't contain top-level aliases with the same names /// Row expression constructor. #[must_use] - pub fn new_row(list: Vec<usize>, distribution: Distribution) -> Self { + pub fn new_row(list: Vec<usize>, distribution: Option<Distribution>) -> Self { Expression::Row { list, distribution } } diff --git a/src/ir/expression/tests.rs b/src/ir/expression/tests.rs index cb19c74783..1057cdca82 100644 --- a/src/ir/expression/tests.rs +++ b/src/ir/expression/tests.rs @@ -5,7 +5,7 @@ use std::fs; use std::path::Path; #[test] -fn suggested_distribution() { +fn suggest_distribution() { // Load a table "t (a, b, c, d)" distributed by ["b", "a"] // with a sec scan and three additional alias-reference pairs // for "a", "b" and "c". We want to see, what suggestions would @@ -16,7 +16,7 @@ fn suggested_distribution() { .join("artifactory") .join("ir") .join("expression") - .join("suggested_distribution.yaml"); + .join("suggest_distribution.yaml"); let s = fs::read_to_string(path).unwrap(); let plan = Plan::from_yaml(&s).unwrap(); @@ -30,7 +30,7 @@ fn suggested_distribution() { assert_eq!( Distribution::Segment { key: vec![1, 0] }, output - .suggested_distribution(&Branch::Left, &[a, b, c], &plan) + .suggest_distribution(&Branch::Left, &[a, b, c], &plan) .unwrap() ); @@ -38,7 +38,7 @@ fn suggested_distribution() { assert_eq!( Distribution::Segment { key: vec![0, 1] }, output - .suggested_distribution(&Branch::Left, &[b, a], &plan) + .suggest_distribution(&Branch::Left, &[b, a], &plan) .unwrap() ); @@ -46,7 +46,7 @@ fn suggested_distribution() { assert_eq!( Distribution::Random, output - .suggested_distribution(&Branch::Left, &[c, a], &plan) + .suggest_distribution(&Branch::Left, &[c, a], &plan) .unwrap() ); @@ -54,7 +54,7 @@ fn suggested_distribution() { assert_eq!( Distribution::Random, output - .suggested_distribution(&Branch::Left, &[a], &plan) + .suggest_distribution(&Branch::Left, &[a], &plan) .unwrap() ); @@ -62,7 +62,7 @@ fn suggested_distribution() { assert_eq!( Distribution::Segment { key: vec![1, 0] }, output - .suggested_distribution(&Branch::Both, &[a, b, c], &plan) + .suggest_distribution(&Branch::Both, &[a, b, c], &plan) .unwrap() ); @@ -70,7 +70,7 @@ fn suggested_distribution() { assert_eq!( Distribution::Random, output - .suggested_distribution(&Branch::Right, &[a, b, c], &plan) + .suggest_distribution(&Branch::Right, &[a, b, c], &plan) .unwrap() ); diff --git a/src/ir/operator.rs b/src/ir/operator.rs index 2bb4e66a99..8b6dd85515 100644 --- a/src/ir/operator.rs +++ b/src/ir/operator.rs @@ -1,6 +1,6 @@ //! Operators for expression transformations. -use super::expression::{Branch, Distribution, Expression}; +use super::expression::{Branch, Expression}; use super::relation::Table; use super::{vec_alloc, Node, Plan}; use crate::errors::QueryPlannerError; @@ -226,11 +226,7 @@ impl Relational { if let Some(relations) = &plan.relations { if let Some(rel) = relations.get(table_name) { match rel { - Table::Segment { - ref columns, - key, - name: _, - } => { + Table::Segment { ref columns, .. } => { let refs = columns .iter() .enumerate() @@ -247,10 +243,7 @@ impl Relational { return Ok(Relational::ScanRelation { output: vec_alloc( nodes, - Node::Expression(Expression::new_row( - refs, - Distribution::Segment { key: key.clone() }, - )), + Node::Expression(Expression::new_row(refs, None)), ), relation: String::from(table_name), }); @@ -266,7 +259,10 @@ impl Relational { /// New `Projection` constructor. /// /// # Errors - /// Returns `QueryPlannerError` when the child node is invalid. + /// Returns `QueryPlannerError`: + /// - child node is not relational + /// - child output tuple is invalid + /// - column name do not match the ones in the child output tuple pub fn new_proj( plan: &mut Plan, child: usize, @@ -274,32 +270,29 @@ impl Relational { ) -> Result<Self, QueryPlannerError> { let aliases = new_alias_nodes(plan, child, output, &Branch::Left)?; - if let Node::Relational(child_node) = plan.get_node(child)? { - if let Node::Expression(child_row) = plan.get_node(child_node.output())? { - let dist = child_row.suggested_distribution(&Branch::Left, &aliases, plan)?; - let new_output = vec_alloc( - &mut plan.nodes, - Node::Expression(Expression::new_row(aliases, dist)), - ); - return Ok(Relational::Projection { - child, - output: new_output, - }); - } - } - Err(QueryPlannerError::InvalidPlan) + let new_output = vec_alloc( + &mut plan.nodes, + Node::Expression(Expression::new_row(aliases, None)), + ); + + Ok(Relational::Projection { + child, + output: new_output, + }) } /// New `Selection` constructor /// /// # Errors - /// Returns `QueryPlannerError` when the child or filter nodes are invalid. + /// Returns `QueryPlannerError`: + /// - filter expression is not boolean + /// - child node is not relational + /// - child output tuple is not valid pub fn new_select( plan: &mut Plan, child: usize, filter: usize, ) -> Result<Self, QueryPlannerError> { - // Check that filter node is a boolean expression. if let Node::Expression(Expression::Bool { .. }) = plan.get_node(filter)? { } else { return Err(QueryPlannerError::InvalidBool); @@ -310,32 +303,28 @@ impl Relational { } else { return Err(QueryPlannerError::InvalidRow); }; - let output: Vec<&str> = names.iter().map(|s| s as &str).collect(); let aliases = new_alias_nodes(plan, child, &output, &Branch::Left)?; - if let Node::Relational(child_node) = plan.get_node(child)? { - if let Node::Expression(child_row) = plan.get_node(child_node.output())? { - let dist = child_row.suggested_distribution(&Branch::Left, &aliases, plan)?; - let new_output = vec_alloc( - &mut plan.nodes, - Node::Expression(Expression::new_row(aliases, dist)), - ); - return Ok(Relational::Selection { - child, - filter, - output: new_output, - }); - } - } - Err(QueryPlannerError::InvalidPlan) + let new_output = vec_alloc( + &mut plan.nodes, + Node::Expression(Expression::new_row(aliases, None)), + ); + + Ok(Relational::Selection { + child, + filter, + output: new_output, + }) } /// New `UnionAll` constructor. /// /// # Errors - /// Returns `QueryPlannerError` when the left or right nodes are invalid - /// or have a different column structure in the output tuples. + /// Returns `QueryPlannerError`: + /// - children nodes are not relational + /// - children tuples are invalid + /// - children tuples have mismatching structure pub fn new_union_all( plan: &mut Plan, left: usize, @@ -364,37 +353,11 @@ impl Relational { let col_names: Vec<&str> = left_names.iter().map(|s| s as &str).collect(); let aliases = new_alias_nodes(plan, left, &col_names, &Branch::Both)?; - let left_dist = if let Node::Relational(left_node) = plan.get_node(left)? { - match plan.get_node(left_node.output())? { - Node::Expression(left_row) => { - left_row.suggested_distribution(&Branch::Left, &aliases, plan)? - } - Node::Relational(_) => return Err(QueryPlannerError::InvalidRow), - } - } else { - return Err(QueryPlannerError::InvalidPlan); - }; - - let right_dist = if let Node::Relational(right_node) = plan.get_node(right)? { - match plan.get_node(right_node.output())? { - Node::Expression(right_row) => { - right_row.suggested_distribution(&Branch::Right, &aliases, plan)? - } - Node::Relational(_) => return Err(QueryPlannerError::InvalidRow), - } - } else { - return Err(QueryPlannerError::InvalidPlan); - }; - - let dist = if left_dist == right_dist { - left_dist - } else { - Distribution::Random - }; let output = vec_alloc( &mut plan.nodes, - Node::Expression(Expression::new_row(aliases, dist)), + Node::Expression(Expression::new_row(aliases, None)), ); + Ok(Relational::UnionAll { left, right, @@ -405,7 +368,10 @@ impl Relational { /// New `ScanSubQuery` constructor. /// /// # Errors - /// Returns `QueryPlannerError` when the child node is invalid. + /// Returns `QueryPlannerError`: + /// - child node is not relational + /// - child node output is not a correct tuple + /// - `SubQuery` name is empty pub fn new_sub_query( plan: &mut Plan, child: usize, @@ -416,7 +382,6 @@ impl Relational { } else { return Err(QueryPlannerError::InvalidRow); }; - if alias.is_empty() { return Err(QueryPlannerError::InvalidName); } @@ -424,20 +389,9 @@ impl Relational { let col_names: Vec<&str> = names.iter().map(|s| s as &str).collect(); let aliases = new_alias_nodes(plan, child, &col_names, &Branch::Both)?; - let dist = if let Node::Relational(left_node) = plan.get_node(child)? { - match plan.get_node(left_node.output())? { - Node::Expression(left_row) => { - left_row.suggested_distribution(&Branch::Left, &aliases, plan)? - } - Node::Relational(_) => return Err(QueryPlannerError::InvalidRow), - } - } else { - return Err(QueryPlannerError::InvalidPlan); - }; - let output = vec_alloc( &mut plan.nodes, - Node::Expression(Expression::new_row(aliases, dist)), + Node::Expression(Expression::new_row(aliases, None)), ); Ok(Relational::ScanSubQuery { diff --git a/src/ir/operator/tests.rs b/src/ir/operator/tests.rs index b501170b40..f94ca8d28c 100644 --- a/src/ir/operator/tests.rs +++ b/src/ir/operator/tests.rs @@ -3,6 +3,7 @@ use crate::errors::QueryPlannerError; use crate::ir::expression::*; use crate::ir::relation::*; use crate::ir::value::*; +use crate::ir::*; use itertools::Itertools; use pretty_assertions::assert_eq; use std::fs; @@ -33,7 +34,9 @@ fn scan_rel() { }, scan ); + assert_eq!(9, vec_alloc(&mut plan.nodes, Node::Relational(scan))); + set_distribution(9, &mut plan).unwrap(); if let Node::Expression(row) = plan.get_node(8).unwrap() { assert_eq!( *row.distribution().unwrap(), @@ -42,8 +45,6 @@ fn scan_rel() { } else { panic!("Wrong output node type!"); } - - assert_eq!(9, vec_alloc(&mut plan.nodes, Node::Relational(scan))); } #[test] @@ -66,6 +67,7 @@ fn scan_rel_serialized() { let scan = Relational::new_scan("t", &mut plan).unwrap(); plan.nodes.push(Node::Relational(scan)); plan.top = Some(9); + set_distribution(plan.top.unwrap(), &mut plan).unwrap(); let path = Path::new("") .join("tests") @@ -96,6 +98,8 @@ fn projection() { let scan = Relational::new_scan("t", &mut plan).unwrap(); let scan_id = vec_alloc(&mut plan.nodes, Node::Relational(scan)); + set_distribution(scan_id, &mut plan).unwrap(); + let proj_seg = Relational::new_proj(&mut plan, scan_id, &["b", "a"]).unwrap(); assert_eq!( Relational::Projection { @@ -104,6 +108,8 @@ fn projection() { }, proj_seg ); + let proj_seg_id = vec_alloc(&mut plan.nodes, Node::Relational(proj_seg)); + set_distribution(proj_seg_id, &mut plan).unwrap(); if let Node::Expression(row) = plan.get_node(14).unwrap() { assert_eq!( @@ -116,12 +122,14 @@ fn projection() { assert_eq!( Relational::Projection { child: scan_id, - output: 19 + output: 20 }, proj_rand ); + let proj_rand_id = vec_alloc(&mut plan.nodes, Node::Relational(proj_rand)); + set_distribution(proj_rand_id, &mut plan).unwrap(); - if let Node::Expression(row) = plan.get_node(19).unwrap() { + if let Node::Expression(row) = plan.get_node(20).unwrap() { assert_eq!(*row.distribution().unwrap(), Distribution::Random); } @@ -238,6 +246,7 @@ fn union_all() { let scan_t1 = Relational::new_scan("t1", &mut plan).unwrap(); let scan_t1_id = vec_alloc(&mut plan.nodes, Node::Relational(scan_t1)); + set_distribution(scan_t1_id, &mut plan).unwrap(); // Check fallback to random distribution let t2 = Table::new_seg( @@ -253,26 +262,42 @@ fn union_all() { let scan_t2 = Relational::new_scan("t2", &mut plan).unwrap(); let scan_t2_id = vec_alloc(&mut plan.nodes, Node::Relational(scan_t2)); + set_distribution(scan_t2_id, &mut plan).unwrap(); let union_all = Relational::new_union_all(&mut plan, scan_t1_id, scan_t2_id).unwrap(); - if let Node::Expression(row) = plan.get_node(union_all.output()).unwrap() { - assert_eq!(Distribution::Random, *row.distribution().unwrap()); + let union_all_id = vec_alloc(&mut plan.nodes, Node::Relational(union_all)); + set_distribution(union_all_id, &mut plan).unwrap(); + + if let Node::Relational(union_all) = plan.get_node(union_all_id).unwrap() { + if let Node::Expression(row) = plan.get_node(union_all.output()).unwrap() { + assert_eq!(Distribution::Random, *row.distribution().unwrap()); + } else { + panic!("Invalid output!"); + } } else { - panic!("Invalid output!"); + panic!("Invalid node!"); } // Check preserving the original distribution let scan_t3 = Relational::new_scan("t1", &mut plan).unwrap(); let scan_t3_id = vec_alloc(&mut plan.nodes, Node::Relational(scan_t3)); + set_distribution(scan_t3_id, &mut plan).unwrap(); let union_all = Relational::new_union_all(&mut plan, scan_t1_id, scan_t3_id).unwrap(); - if let Node::Expression(row) = plan.get_node(union_all.output()).unwrap() { - assert_eq!( - Distribution::Segment { key: vec![0] }, - *row.distribution().unwrap() - ); + let union_all_id = vec_alloc(&mut plan.nodes, Node::Relational(union_all)); + set_distribution(union_all_id, &mut plan).unwrap(); + + if let Node::Relational(union_all) = plan.get_node(union_all_id).unwrap() { + if let Node::Expression(row) = plan.get_node(union_all.output()).unwrap() { + assert_eq!( + Distribution::Segment { key: vec![0] }, + *row.distribution().unwrap() + ); + } else { + panic!("Invalid output!"); + } } else { - panic!("Invalid output!"); + panic!("Invalid node!"); } // Check errors for children with different column names diff --git a/tests/artifactory/ir/expression/suggested_distribution.yaml b/tests/artifactory/ir/expression/suggest_distribution.yaml similarity index 100% rename from tests/artifactory/ir/expression/suggested_distribution.yaml rename to tests/artifactory/ir/expression/suggest_distribution.yaml -- GitLab