Skip to content
Snippets Groups Projects
Commit 9c3fe605 authored by Denis Smirnov's avatar Denis Smirnov
Browse files

feat: calculate tuple distribution in a separate function

Previously distribution was calculated in the node constructor. It
is a wrong design as the tree would be transformed multiple times
and there is no reason to recalculate distribution on each step.
We need distribution only on the last transformation when we insert
Motion nodes. So, this refactoring makes tuple distribution optional
and moves distribution logic to a separate function 'set_distribution()'.
parent 3c03a5eb
No related branches found
No related tags found
1 merge request!1414sbroad import
......@@ -4,39 +4,47 @@ use serde::Serialize;
const BUCKET_ID_ERROR: &str = "field doesn't contains sharding key value";
const DUPLICATE_COLUMN_ERROR: &str = "duplicate column";
const EMPTY_PLAN_RELATION: &str = "empty plan relations";
const INCORRECT_BUCKET_ID_ERROR: &str = "incorrect bucket id";
const INVALID_BOOL_ERROR: &str = "invalid boolean";
const INVALID_NAME_ERROR: &str = "invalid name";
const INVALID_NODE: &str = "invalid node";
const INVALID_NUMBER_ERROR: &str = "invalid number";
const INVALID_PLAN_ERROR: &str = "invalid plan";
const INVALID_RELATION_ERROR: &str = "invalid relation";
const INVALID_ROW_ERROR: &str = "invalid row";
const INVALID_SHARDING_KEY_ERROR: &str = "invalid sharding key";
const NOT_EQUAL_ROWS: &str = "not equal rows";
const QUERY_NOT_IMPLEMENTED: &str = "query wasn't s implemented";
const REQUIRE_MOTION: &str = "require motion";
const SERIALIZATION_ERROR: &str = "serialization";
const SIMPLE_QUERY_ERROR: &str = "query doesn't simple";
const SIMPLE_UNION_QUERY_ERROR: &str = "query doesn't simple union";
const QUERY_NOT_IMPLEMENTED: &str = "query wasn't s implemented";
const UNINITIALIZED_DISTRIBUTION: &str = "uninitialized distribution";
const VALUE_OUT_OF_RANGE_ERROR: &str = "value out of range";
const INCORRECT_BUCKET_ID_ERROR: &str = "incorrect bucket id";
#[derive(Debug, Clone, PartialEq, Serialize)]
pub enum QueryPlannerError {
BucketIdError,
DuplicateColumn,
EmptyPlanRelations,
IncorrectBucketIdError,
InvalidBool,
InvalidName,
InvalidNode,
InvalidNumber,
InvalidPlan,
InvalidRelation,
InvalidRow,
InvalidShardingKey,
NotEqualRows,
QueryNotImplemented,
RequireMotion,
Serialization,
SimpleQueryError,
SimpleUnionQueryError,
QueryNotImplemented,
UninitializedDistribution,
ValueOutOfRange,
IncorrectBucketIdError,
}
impl fmt::Display for QueryPlannerError {
......@@ -44,20 +52,24 @@ impl fmt::Display for QueryPlannerError {
let p = match self {
QueryPlannerError::BucketIdError => BUCKET_ID_ERROR,
QueryPlannerError::DuplicateColumn => DUPLICATE_COLUMN_ERROR,
QueryPlannerError::EmptyPlanRelations => EMPTY_PLAN_RELATION,
QueryPlannerError::IncorrectBucketIdError => INCORRECT_BUCKET_ID_ERROR,
QueryPlannerError::InvalidBool => INVALID_BOOL_ERROR,
QueryPlannerError::InvalidName => INVALID_NAME_ERROR,
QueryPlannerError::InvalidNode => INVALID_NODE,
QueryPlannerError::InvalidNumber => INVALID_NUMBER_ERROR,
QueryPlannerError::InvalidPlan => INVALID_PLAN_ERROR,
QueryPlannerError::InvalidRelation => INVALID_RELATION_ERROR,
QueryPlannerError::InvalidRow => INVALID_ROW_ERROR,
QueryPlannerError::InvalidShardingKey => INVALID_SHARDING_KEY_ERROR,
QueryPlannerError::NotEqualRows => NOT_EQUAL_ROWS,
QueryPlannerError::QueryNotImplemented => QUERY_NOT_IMPLEMENTED,
QueryPlannerError::RequireMotion => REQUIRE_MOTION,
QueryPlannerError::Serialization => SERIALIZATION_ERROR,
QueryPlannerError::SimpleQueryError => SIMPLE_QUERY_ERROR,
QueryPlannerError::SimpleUnionQueryError => SIMPLE_UNION_QUERY_ERROR,
QueryPlannerError::QueryNotImplemented => QUERY_NOT_IMPLEMENTED,
QueryPlannerError::UninitializedDistribution => UNINITIALIZED_DISTRIBUTION,
QueryPlannerError::ValueOutOfRange => VALUE_OUT_OF_RANGE_ERROR,
QueryPlannerError::IncorrectBucketIdError => INCORRECT_BUCKET_ID_ERROR,
};
write!(f, "{}", p)
}
......
......@@ -8,7 +8,7 @@ pub mod relation;
pub mod value;
use crate::errors::QueryPlannerError;
use expression::Expression;
use expression::{Branch, Distribution, Expression};
use operator::Relational;
use relation::Table;
use serde::{Deserialize, Serialize};
......@@ -33,6 +33,156 @@ pub enum Node {
Relational(Relational),
}
/// Suggested distribution by the child relational node.
/// A wrapper for `Expression::suggest_distribution()`;
/// used by relational nodes when they calculate their
/// distribution.
///
/// # Errors
/// Returns `QueryPlannerError`:
/// - parent node's output is not a valid tuple
/// - child node is not relational
/// - child's output is not a valid tuple
fn child_dist(
output: usize,
child: usize,
branch: &Branch,
plan: &Plan,
) -> Result<Distribution, QueryPlannerError> {
// Get current output tuple column list
let aliases: &Vec<usize> =
if let Node::Expression(Expression::Row { list, .. }) = plan.get_node(output)? {
Ok(list)
} else {
Err(QueryPlannerError::InvalidRow)
}?;
// Distribution suggested by the child.
if let Node::Relational(child_node) = plan.get_node(child)? {
if let Node::Expression(child_row) = plan.get_node(child_node.output())? {
Ok(child_row.suggest_distribution(branch, aliases, plan)?)
} else {
Err(QueryPlannerError::InvalidRow)
}
} else {
Err(QueryPlannerError::InvalidPlan)
}
}
/// Set output tuple distribution for the node.
///
/// # Errors
/// Returns `QueryPlannerError`:
/// - when node position doesn't exist in the plan node arena
/// - for nodes, that don't produce tuples (all expressions except `Row`)
/// - for relational nodes with invalid output or children
pub fn set_distribution(pointer: usize, plan: &mut Plan) -> Result<(), QueryPlannerError> {
match plan.get_node(pointer)? {
Node::Relational(relational) => {
match relational {
Relational::ScanRelation {
relation: table_name,
..
} => {
if let Some(relations) = &plan.relations {
if let Some(rel) = relations.get(table_name) {
// Update output tuple distribution to the relation's one.
match rel {
Table::Segment { key, .. } | Table::VirtualSegment { key, .. } => {
let rel_tuple = relational.output();
let node = plan
.nodes
.get_mut(rel_tuple)
.ok_or(QueryPlannerError::ValueOutOfRange)?;
if let Node::Expression(Expression::Row {
ref mut distribution,
..
}) = node
{
*distribution =
Some(Distribution::Segment { key: key.clone() });
return Ok(());
}
return Err(QueryPlannerError::InvalidRow);
}
Table::Virtual { .. } => {
let rel_tuple = relational.output();
let node = plan
.nodes
.get_mut(rel_tuple)
.ok_or(QueryPlannerError::ValueOutOfRange)?;
if let Node::Expression(Expression::Row {
ref mut distribution,
..
}) = node
{
*distribution = Some(Distribution::Random);
return Ok(());
}
return Err(QueryPlannerError::InvalidRow);
}
}
}
return Err(QueryPlannerError::InvalidRelation);
}
Err(QueryPlannerError::EmptyPlanRelations)
}
Relational::Projection { child, output, .. }
| Relational::Selection { child, output, .. }
| Relational::ScanSubQuery { child, output, .. } => {
let dist = child_dist(*output, *child, &Branch::Left, plan)?;
let rel_tuple = relational.output();
let node = plan
.nodes
.get_mut(rel_tuple)
.ok_or(QueryPlannerError::ValueOutOfRange)?;
if let Node::Expression(Expression::Row {
ref mut distribution,
..
}) = node
{
*distribution = Some(dist);
return Ok(());
}
Err(QueryPlannerError::InvalidNode)
}
Relational::UnionAll {
left,
right,
output,
..
} => {
let left_dist = child_dist(*output, *left, &Branch::Both, plan)?;
let right_dist = child_dist(*output, *right, &Branch::Both, plan)?;
let rel_tuple = relational.output();
let node = plan
.nodes
.get_mut(rel_tuple)
.ok_or(QueryPlannerError::ValueOutOfRange)?;
if let Node::Expression(Expression::Row {
ref mut distribution,
..
}) = node
{
*distribution = Some(Distribution::new_union(left_dist, right_dist)?);
return Ok(());
}
Err(QueryPlannerError::InvalidRow)
}
// TODO: implement it!
Relational::InnerJoin { .. } | Relational::Motion { .. } => {
Err(QueryPlannerError::QueryNotImplemented)
}
}
}
// TODO: how should we implement it for the `Row`?
Node::Expression(_) => Err(QueryPlannerError::QueryNotImplemented),
}
}
/// Plan node "allocator".
///
/// Inserts an element to the array and returns its position,
......
......@@ -15,6 +15,10 @@ use std::collections::HashMap;
/// Tuple data chunk distribution policy in the cluster.
#[derive(Serialize, Deserialize, PartialEq, Debug)]
pub enum Distribution {
/// Only coordinator contains all the data.
///
/// Example: insertion to the virtual table on coordinator.
Coordinator,
/// Each segment contains a random portion of data.
///
/// Example: "erased" distribution key by projection operator.
......@@ -32,10 +36,49 @@ pub enum Distribution {
/// that for a distribution key.
key: Vec<usize>,
},
/// Only a single segment contains all the data.
}
impl Distribution {
/// Calculate a new distribution for the `UnionAll` output tuple.
///
/// Example: insertion to the virtual table on coordinator.
Single,
/// # Errors
/// Returns `QueryPlannerError`:
/// - distribution conflict that should be resolved by adding a `Motion` node
pub fn new_union(
left: Distribution,
right: Distribution,
) -> Result<Distribution, QueryPlannerError> {
match left {
Distribution::Coordinator => match right {
Distribution::Coordinator => Ok(left),
_ => Err(QueryPlannerError::RequireMotion),
},
Distribution::Random => match right {
Distribution::Coordinator => Err(QueryPlannerError::RequireMotion),
_ => Ok(left),
},
Distribution::Replicated => match right {
Distribution::Coordinator => Err(QueryPlannerError::RequireMotion),
_ => Ok(right),
},
Distribution::Segment {
key: ref left_key, ..
} => match right {
Distribution::Random => Ok(Distribution::Random),
Distribution::Replicated => Ok(left),
Distribution::Segment {
key: ref right_key, ..
} => {
if left_key.iter().zip(right_key.iter()).all(|(l, r)| l == r) {
Ok(left)
} else {
Ok(Distribution::Random)
}
}
Distribution::Coordinator => Err(QueryPlannerError::RequireMotion),
},
}
}
}
/// Tree branch.
......@@ -133,8 +176,9 @@ pub enum Expression {
Row {
/// A list of the alias expression node indexes in the plan node arena.
list: Vec<usize>,
/// Resulting data distribution of the tuple.
distribution: Distribution,
/// Resulting data distribution of the tuple. Should be filled as a part
/// of the last "add Motion" transformation.
distribution: Option<Distribution>,
},
}
......@@ -147,8 +191,9 @@ impl Expression {
/// This function is executed on the child's side.
///
/// # Errors
/// Returns `QueryPlannerError` when aliases are invalid.
pub fn suggested_distribution(
/// Returns `QueryPlannerError` when aliases are invalid or a node doesn't know its
/// distribution yet.
pub fn suggest_distribution(
&self,
my_branch: &Branch,
aliases: &[usize],
......@@ -158,7 +203,12 @@ impl Expression {
ref distribution, ..
} = self
{
match *distribution {
let dist = match distribution {
Some(d) => &*d,
None => return Err(QueryPlannerError::UninitializedDistribution),
};
match dist {
Distribution::Random => return Ok(Distribution::Random),
Distribution::Replicated => return Ok(Distribution::Replicated),
Distribution::Segment { ref key } => {
......@@ -206,7 +256,7 @@ impl Expression {
}
return Ok(Distribution::Random);
}
Distribution::Single => return Ok(Distribution::Single),
Distribution::Coordinator => return Ok(Distribution::Coordinator),
}
}
Err(QueryPlannerError::InvalidRow)
......@@ -215,11 +265,15 @@ impl Expression {
/// Get current row distribution.
///
/// # Errors
/// Returns `QueryPlannerError` when the function is called on
/// expression other than `Row`.
/// Returns `QueryPlannerError` when the function is called on expression
/// other than `Row` or a node doesn't know its distribution yet.
pub fn distribution(&self) -> Result<&Distribution, QueryPlannerError> {
if let Expression::Row { distribution, .. } = self {
return Ok(distribution);
let dist = match distribution {
Some(d) => d,
None => return Err(QueryPlannerError::UninitializedDistribution),
};
return Ok(dist);
}
Err(QueryPlannerError::InvalidRow)
}
......@@ -248,7 +302,7 @@ impl Expression {
// TODO: check that doesn't contain top-level aliases with the same names
/// Row expression constructor.
#[must_use]
pub fn new_row(list: Vec<usize>, distribution: Distribution) -> Self {
pub fn new_row(list: Vec<usize>, distribution: Option<Distribution>) -> Self {
Expression::Row { list, distribution }
}
......
......@@ -5,7 +5,7 @@ use std::fs;
use std::path::Path;
#[test]
fn suggested_distribution() {
fn suggest_distribution() {
// Load a table "t (a, b, c, d)" distributed by ["b", "a"]
// with a sec scan and three additional alias-reference pairs
// for "a", "b" and "c". We want to see, what suggestions would
......@@ -16,7 +16,7 @@ fn suggested_distribution() {
.join("artifactory")
.join("ir")
.join("expression")
.join("suggested_distribution.yaml");
.join("suggest_distribution.yaml");
let s = fs::read_to_string(path).unwrap();
let plan = Plan::from_yaml(&s).unwrap();
......@@ -30,7 +30,7 @@ fn suggested_distribution() {
assert_eq!(
Distribution::Segment { key: vec![1, 0] },
output
.suggested_distribution(&Branch::Left, &[a, b, c], &plan)
.suggest_distribution(&Branch::Left, &[a, b, c], &plan)
.unwrap()
);
......@@ -38,7 +38,7 @@ fn suggested_distribution() {
assert_eq!(
Distribution::Segment { key: vec![0, 1] },
output
.suggested_distribution(&Branch::Left, &[b, a], &plan)
.suggest_distribution(&Branch::Left, &[b, a], &plan)
.unwrap()
);
......@@ -46,7 +46,7 @@ fn suggested_distribution() {
assert_eq!(
Distribution::Random,
output
.suggested_distribution(&Branch::Left, &[c, a], &plan)
.suggest_distribution(&Branch::Left, &[c, a], &plan)
.unwrap()
);
......@@ -54,7 +54,7 @@ fn suggested_distribution() {
assert_eq!(
Distribution::Random,
output
.suggested_distribution(&Branch::Left, &[a], &plan)
.suggest_distribution(&Branch::Left, &[a], &plan)
.unwrap()
);
......@@ -62,7 +62,7 @@ fn suggested_distribution() {
assert_eq!(
Distribution::Segment { key: vec![1, 0] },
output
.suggested_distribution(&Branch::Both, &[a, b, c], &plan)
.suggest_distribution(&Branch::Both, &[a, b, c], &plan)
.unwrap()
);
......@@ -70,7 +70,7 @@ fn suggested_distribution() {
assert_eq!(
Distribution::Random,
output
.suggested_distribution(&Branch::Right, &[a, b, c], &plan)
.suggest_distribution(&Branch::Right, &[a, b, c], &plan)
.unwrap()
);
......
//! Operators for expression transformations.
use super::expression::{Branch, Distribution, Expression};
use super::expression::{Branch, Expression};
use super::relation::Table;
use super::{vec_alloc, Node, Plan};
use crate::errors::QueryPlannerError;
......@@ -226,11 +226,7 @@ impl Relational {
if let Some(relations) = &plan.relations {
if let Some(rel) = relations.get(table_name) {
match rel {
Table::Segment {
ref columns,
key,
name: _,
} => {
Table::Segment { ref columns, .. } => {
let refs = columns
.iter()
.enumerate()
......@@ -247,10 +243,7 @@ impl Relational {
return Ok(Relational::ScanRelation {
output: vec_alloc(
nodes,
Node::Expression(Expression::new_row(
refs,
Distribution::Segment { key: key.clone() },
)),
Node::Expression(Expression::new_row(refs, None)),
),
relation: String::from(table_name),
});
......@@ -266,7 +259,10 @@ impl Relational {
/// New `Projection` constructor.
///
/// # Errors
/// Returns `QueryPlannerError` when the child node is invalid.
/// Returns `QueryPlannerError`:
/// - child node is not relational
/// - child output tuple is invalid
/// - column name do not match the ones in the child output tuple
pub fn new_proj(
plan: &mut Plan,
child: usize,
......@@ -274,32 +270,29 @@ impl Relational {
) -> Result<Self, QueryPlannerError> {
let aliases = new_alias_nodes(plan, child, output, &Branch::Left)?;
if let Node::Relational(child_node) = plan.get_node(child)? {
if let Node::Expression(child_row) = plan.get_node(child_node.output())? {
let dist = child_row.suggested_distribution(&Branch::Left, &aliases, plan)?;
let new_output = vec_alloc(
&mut plan.nodes,
Node::Expression(Expression::new_row(aliases, dist)),
);
return Ok(Relational::Projection {
child,
output: new_output,
});
}
}
Err(QueryPlannerError::InvalidPlan)
let new_output = vec_alloc(
&mut plan.nodes,
Node::Expression(Expression::new_row(aliases, None)),
);
Ok(Relational::Projection {
child,
output: new_output,
})
}
/// New `Selection` constructor
///
/// # Errors
/// Returns `QueryPlannerError` when the child or filter nodes are invalid.
/// Returns `QueryPlannerError`:
/// - filter expression is not boolean
/// - child node is not relational
/// - child output tuple is not valid
pub fn new_select(
plan: &mut Plan,
child: usize,
filter: usize,
) -> Result<Self, QueryPlannerError> {
// Check that filter node is a boolean expression.
if let Node::Expression(Expression::Bool { .. }) = plan.get_node(filter)? {
} else {
return Err(QueryPlannerError::InvalidBool);
......@@ -310,32 +303,28 @@ impl Relational {
} else {
return Err(QueryPlannerError::InvalidRow);
};
let output: Vec<&str> = names.iter().map(|s| s as &str).collect();
let aliases = new_alias_nodes(plan, child, &output, &Branch::Left)?;
if let Node::Relational(child_node) = plan.get_node(child)? {
if let Node::Expression(child_row) = plan.get_node(child_node.output())? {
let dist = child_row.suggested_distribution(&Branch::Left, &aliases, plan)?;
let new_output = vec_alloc(
&mut plan.nodes,
Node::Expression(Expression::new_row(aliases, dist)),
);
return Ok(Relational::Selection {
child,
filter,
output: new_output,
});
}
}
Err(QueryPlannerError::InvalidPlan)
let new_output = vec_alloc(
&mut plan.nodes,
Node::Expression(Expression::new_row(aliases, None)),
);
Ok(Relational::Selection {
child,
filter,
output: new_output,
})
}
/// New `UnionAll` constructor.
///
/// # Errors
/// Returns `QueryPlannerError` when the left or right nodes are invalid
/// or have a different column structure in the output tuples.
/// Returns `QueryPlannerError`:
/// - children nodes are not relational
/// - children tuples are invalid
/// - children tuples have mismatching structure
pub fn new_union_all(
plan: &mut Plan,
left: usize,
......@@ -364,37 +353,11 @@ impl Relational {
let col_names: Vec<&str> = left_names.iter().map(|s| s as &str).collect();
let aliases = new_alias_nodes(plan, left, &col_names, &Branch::Both)?;
let left_dist = if let Node::Relational(left_node) = plan.get_node(left)? {
match plan.get_node(left_node.output())? {
Node::Expression(left_row) => {
left_row.suggested_distribution(&Branch::Left, &aliases, plan)?
}
Node::Relational(_) => return Err(QueryPlannerError::InvalidRow),
}
} else {
return Err(QueryPlannerError::InvalidPlan);
};
let right_dist = if let Node::Relational(right_node) = plan.get_node(right)? {
match plan.get_node(right_node.output())? {
Node::Expression(right_row) => {
right_row.suggested_distribution(&Branch::Right, &aliases, plan)?
}
Node::Relational(_) => return Err(QueryPlannerError::InvalidRow),
}
} else {
return Err(QueryPlannerError::InvalidPlan);
};
let dist = if left_dist == right_dist {
left_dist
} else {
Distribution::Random
};
let output = vec_alloc(
&mut plan.nodes,
Node::Expression(Expression::new_row(aliases, dist)),
Node::Expression(Expression::new_row(aliases, None)),
);
Ok(Relational::UnionAll {
left,
right,
......@@ -405,7 +368,10 @@ impl Relational {
/// New `ScanSubQuery` constructor.
///
/// # Errors
/// Returns `QueryPlannerError` when the child node is invalid.
/// Returns `QueryPlannerError`:
/// - child node is not relational
/// - child node output is not a correct tuple
/// - `SubQuery` name is empty
pub fn new_sub_query(
plan: &mut Plan,
child: usize,
......@@ -416,7 +382,6 @@ impl Relational {
} else {
return Err(QueryPlannerError::InvalidRow);
};
if alias.is_empty() {
return Err(QueryPlannerError::InvalidName);
}
......@@ -424,20 +389,9 @@ impl Relational {
let col_names: Vec<&str> = names.iter().map(|s| s as &str).collect();
let aliases = new_alias_nodes(plan, child, &col_names, &Branch::Both)?;
let dist = if let Node::Relational(left_node) = plan.get_node(child)? {
match plan.get_node(left_node.output())? {
Node::Expression(left_row) => {
left_row.suggested_distribution(&Branch::Left, &aliases, plan)?
}
Node::Relational(_) => return Err(QueryPlannerError::InvalidRow),
}
} else {
return Err(QueryPlannerError::InvalidPlan);
};
let output = vec_alloc(
&mut plan.nodes,
Node::Expression(Expression::new_row(aliases, dist)),
Node::Expression(Expression::new_row(aliases, None)),
);
Ok(Relational::ScanSubQuery {
......
......@@ -3,6 +3,7 @@ use crate::errors::QueryPlannerError;
use crate::ir::expression::*;
use crate::ir::relation::*;
use crate::ir::value::*;
use crate::ir::*;
use itertools::Itertools;
use pretty_assertions::assert_eq;
use std::fs;
......@@ -33,7 +34,9 @@ fn scan_rel() {
},
scan
);
assert_eq!(9, vec_alloc(&mut plan.nodes, Node::Relational(scan)));
set_distribution(9, &mut plan).unwrap();
if let Node::Expression(row) = plan.get_node(8).unwrap() {
assert_eq!(
*row.distribution().unwrap(),
......@@ -42,8 +45,6 @@ fn scan_rel() {
} else {
panic!("Wrong output node type!");
}
assert_eq!(9, vec_alloc(&mut plan.nodes, Node::Relational(scan)));
}
#[test]
......@@ -66,6 +67,7 @@ fn scan_rel_serialized() {
let scan = Relational::new_scan("t", &mut plan).unwrap();
plan.nodes.push(Node::Relational(scan));
plan.top = Some(9);
set_distribution(plan.top.unwrap(), &mut plan).unwrap();
let path = Path::new("")
.join("tests")
......@@ -96,6 +98,8 @@ fn projection() {
let scan = Relational::new_scan("t", &mut plan).unwrap();
let scan_id = vec_alloc(&mut plan.nodes, Node::Relational(scan));
set_distribution(scan_id, &mut plan).unwrap();
let proj_seg = Relational::new_proj(&mut plan, scan_id, &["b", "a"]).unwrap();
assert_eq!(
Relational::Projection {
......@@ -104,6 +108,8 @@ fn projection() {
},
proj_seg
);
let proj_seg_id = vec_alloc(&mut plan.nodes, Node::Relational(proj_seg));
set_distribution(proj_seg_id, &mut plan).unwrap();
if let Node::Expression(row) = plan.get_node(14).unwrap() {
assert_eq!(
......@@ -116,12 +122,14 @@ fn projection() {
assert_eq!(
Relational::Projection {
child: scan_id,
output: 19
output: 20
},
proj_rand
);
let proj_rand_id = vec_alloc(&mut plan.nodes, Node::Relational(proj_rand));
set_distribution(proj_rand_id, &mut plan).unwrap();
if let Node::Expression(row) = plan.get_node(19).unwrap() {
if let Node::Expression(row) = plan.get_node(20).unwrap() {
assert_eq!(*row.distribution().unwrap(), Distribution::Random);
}
......@@ -238,6 +246,7 @@ fn union_all() {
let scan_t1 = Relational::new_scan("t1", &mut plan).unwrap();
let scan_t1_id = vec_alloc(&mut plan.nodes, Node::Relational(scan_t1));
set_distribution(scan_t1_id, &mut plan).unwrap();
// Check fallback to random distribution
let t2 = Table::new_seg(
......@@ -253,26 +262,42 @@ fn union_all() {
let scan_t2 = Relational::new_scan("t2", &mut plan).unwrap();
let scan_t2_id = vec_alloc(&mut plan.nodes, Node::Relational(scan_t2));
set_distribution(scan_t2_id, &mut plan).unwrap();
let union_all = Relational::new_union_all(&mut plan, scan_t1_id, scan_t2_id).unwrap();
if let Node::Expression(row) = plan.get_node(union_all.output()).unwrap() {
assert_eq!(Distribution::Random, *row.distribution().unwrap());
let union_all_id = vec_alloc(&mut plan.nodes, Node::Relational(union_all));
set_distribution(union_all_id, &mut plan).unwrap();
if let Node::Relational(union_all) = plan.get_node(union_all_id).unwrap() {
if let Node::Expression(row) = plan.get_node(union_all.output()).unwrap() {
assert_eq!(Distribution::Random, *row.distribution().unwrap());
} else {
panic!("Invalid output!");
}
} else {
panic!("Invalid output!");
panic!("Invalid node!");
}
// Check preserving the original distribution
let scan_t3 = Relational::new_scan("t1", &mut plan).unwrap();
let scan_t3_id = vec_alloc(&mut plan.nodes, Node::Relational(scan_t3));
set_distribution(scan_t3_id, &mut plan).unwrap();
let union_all = Relational::new_union_all(&mut plan, scan_t1_id, scan_t3_id).unwrap();
if let Node::Expression(row) = plan.get_node(union_all.output()).unwrap() {
assert_eq!(
Distribution::Segment { key: vec![0] },
*row.distribution().unwrap()
);
let union_all_id = vec_alloc(&mut plan.nodes, Node::Relational(union_all));
set_distribution(union_all_id, &mut plan).unwrap();
if let Node::Relational(union_all) = plan.get_node(union_all_id).unwrap() {
if let Node::Expression(row) = plan.get_node(union_all.output()).unwrap() {
assert_eq!(
Distribution::Segment { key: vec![0] },
*row.distribution().unwrap()
);
} else {
panic!("Invalid output!");
}
} else {
panic!("Invalid output!");
panic!("Invalid node!");
}
// Check errors for children with different column names
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment