Skip to content
Snippets Groups Projects
Commit c8c5dc08 authored by Denis Smirnov's avatar Denis Smirnov
Browse files

feat: reduce the amount of the distribution types

At the moment it seems that we are enough with two distribution
types: by key (segment) and not by key (any). So, remove all other
types as redundant.
parent bc6f24bb
No related branches found
No related tags found
1 merge request!1414sbroad import
......@@ -29,104 +29,72 @@ impl Key {
}
}
/// Tuple data chunk distribution policy in the cluster.
/// Tuple distribution in the cluster.
#[derive(Serialize, Deserialize, PartialEq, Debug)]
pub enum Distribution {
/// Only coordinator contains all the data.
///
/// Example: insertion to the virtual table on coordinator.
Coordinator,
/// Each segment contains a random portion of data.
///
/// Example: "erased" distribution key by projection operator.
Random,
/// Each segment contains a full copy of data.
///
/// Example: constant expressions.
Replicated,
/// Each segment contains a portion of data,
/// determined by the distribution key rule.
///
/// Example: segmented table.
/// A tuple can be located on any data node
Any,
/// Tuple distribution is set by the distribution key.
/// Example: tuples from the segmented table.
Segment {
/// A set of segment keys (we can have multiple keys after join)
/// A set of distribution keys (we can have multiple keys after join)
keys: HashSet<Key>,
},
}
impl Distribution {
fn new_from_two_children(
left: Distribution,
right: Distribution,
is_join: bool,
) -> Result<Distribution, QueryPlannerError> {
/// Calculate a new distribution for the `UnionAll` output tuple.
fn union(left: &Distribution, right: &Distribution) -> Distribution {
match left {
Distribution::Coordinator => match right {
Distribution::Coordinator => Ok(left),
_ => Err(QueryPlannerError::RequireMotion),
},
Distribution::Random => match right {
Distribution::Coordinator => Err(QueryPlannerError::RequireMotion),
_ => Ok(left),
},
Distribution::Replicated => match right {
Distribution::Coordinator => Err(QueryPlannerError::RequireMotion),
_ => Ok(right),
},
Distribution::Any => Distribution::Any,
Distribution::Segment {
keys: ref left_keys,
keys: ref keys_left,
..
} => match right {
Distribution::Random => Ok(Distribution::Random),
Distribution::Replicated => Ok(left),
Distribution::Any => Distribution::Any,
Distribution::Segment {
keys: ref right_keys,
keys: ref keys_right,
..
} => {
let mut keys: HashSet<Key> = HashSet::new();
if is_join {
for key in left_keys.union(right_keys) {
keys.insert(Key::new(key.positions.clone()));
}
} else {
for key in left_keys.intersection(right_keys) {
keys.insert(Key::new(key.positions.clone()));
}
for key in keys_left.intersection(keys_right) {
keys.insert(Key::new(key.positions.clone()));
}
if keys.is_empty() {
Ok(Distribution::Random)
Distribution::Any
} else {
Ok(Distribution::Segment { keys })
Distribution::Segment { keys }
}
}
Distribution::Coordinator => Err(QueryPlannerError::RequireMotion),
},
}
}
/// Calculate a new distribution for the `UnionAll` output tuple.
///
/// # Errors
/// Returns `QueryPlannerError`:
/// - distribution conflict that should be resolved by adding a `Motion` node
pub fn new_union(
left: Distribution,
right: Distribution,
) -> Result<Distribution, QueryPlannerError> {
Distribution::new_from_two_children(left, right, false)
}
/// Calculate a new distribution for the `InnerJoin` output tuple.
///
/// # Errors
/// Returns `QueryPlannerError`:
/// - distribution conflict that should be resolved by adding a `Motion` node
pub fn new_join(
left: Distribution,
right: Distribution,
) -> Result<Distribution, QueryPlannerError> {
Distribution::new_from_two_children(left, right, true)
/// Calculate a new distribution for the tuple combined from the two different tuples.
fn join(left: &Distribution, right: Distribution) -> Distribution {
match left {
Distribution::Any => right,
Distribution::Segment {
keys: ref keys_left,
..
} => match right {
Distribution::Any => Distribution::Any,
Distribution::Segment {
keys: ref keys_right,
..
} => {
let mut keys: HashSet<Key> = HashSet::new();
for key in keys_left.union(keys_right) {
keys.insert(Key::new(key.positions.clone()));
}
if keys.is_empty() {
Distribution::Any
} else {
Distribution::Segment { keys }
}
}
},
}
}
}
......@@ -146,49 +114,57 @@ impl Plan {
let mut table_pos_map: HashMap<usize, usize> = HashMap::default();
let mut parent_node: Option<usize> = None;
if let Node::Expression(Expression::Row { list, .. }) = self.get_node(row_node)? {
// Gather information about children nodes, that are pointed by the row references.
for (pos, alias_node) in list.iter().enumerate() {
if let Node::Expression(Expression::Alias { child, .. }) =
self.get_node(*alias_node)?
let mut populate_maps = |pos: usize, node_id: usize| -> Result<(), QueryPlannerError> {
if let Node::Expression(Expression::Reference {
targets,
position,
parent,
..
}) = self.get_node(node_id)?
{
// Get a relational node, containing current row.
parent_node = Some(*id_map.get(parent).ok_or(QueryPlannerError::InvalidNode)?);
if let Node::Relational(relational_op) =
self.get_node(parent_node.ok_or(QueryPlannerError::InvalidNode)?)?
{
if let Node::Expression(Expression::Reference {
targets,
position,
parent,
..
}) = self.get_node(*child)?
{
parent_node = Some(self.get_map_relational_value(*parent)?);
let relational_op = self.get_relation_node(
parent_node.ok_or(QueryPlannerError::InvalidRelation)?,
)?;
if let Some(children) = relational_op.children() {
// References in the branch node.
let child_pos_list: &Vec<usize> = targets
.as_ref()
.ok_or(QueryPlannerError::InvalidReference)?;
for target in child_pos_list {
let child_node: usize = *children
.get(*target)
.ok_or(QueryPlannerError::ValueOutOfRange)?;
child_set.insert(child_node);
child_pos_map.insert((child_node, *position), pos);
}
if let Some(children) = relational_op.children() {
// References in the branch node.
let child_pos_list: &Vec<usize> = targets
.as_ref()
.ok_or(QueryPlannerError::InvalidReference)?;
for target in child_pos_list {
let child_node: usize = *children
.get(*target)
.ok_or(QueryPlannerError::ValueOutOfRange)?;
child_set.insert(child_node);
child_pos_map.insert((child_node, *position), pos);
}
} else {
// References in the leaf (relation scan) node.
if targets.is_some() {
return Err(QueryPlannerError::InvalidReference);
}
if let Relational::ScanRelation { relation, .. } = relational_op {
table_set.insert(relation.clone());
table_pos_map.insert(*position, pos);
} else {
// References in the leaf (relation scan) node.
if targets.is_some() {
return Err(QueryPlannerError::InvalidReference);
}
if let Relational::ScanRelation { relation, .. } = relational_op {
table_set.insert(relation.clone());
table_pos_map.insert(*position, pos);
} else {
return Err(QueryPlannerError::InvalidReference);
}
return Err(QueryPlannerError::InvalidReference);
}
}
} else {
return Err(QueryPlannerError::InvalidNode);
}
}
Ok(())
};
if let Node::Expression(Expression::Row { list, .. }) = self.get_node(row_node)? {
// Gather information about children nodes, that are pointed by the row references.
for (pos, node) in list.iter().enumerate() {
if let Node::Expression(Expression::Alias { child, .. }) = self.get_node(*node)? {
populate_maps(pos, *child)?;
} else {
populate_maps(pos, *node)?;
}
}
}
......@@ -246,9 +222,7 @@ impl Plan {
{
match child_dist {
None => return Err(QueryPlannerError::UninitializedDistribution),
Some(Distribution::Random) => return Ok(Distribution::Random),
Some(Distribution::Replicated) => return Ok(Distribution::Replicated),
Some(Distribution::Coordinator) => return Ok(Distribution::Coordinator),
Some(Distribution::Any) => return Ok(Distribution::Any),
Some(Distribution::Segment { keys }) => {
let mut new_keys: HashSet<Key> = HashSet::new();
for key in keys {
......@@ -268,7 +242,7 @@ impl Plan {
}
if new_keys.is_empty() {
return Ok(Distribution::Random);
return Ok(Distribution::Any);
}
return Ok(Distribution::Segment { keys: new_keys });
}
......@@ -330,7 +304,7 @@ impl Plan {
.get_mut(row_node)
.ok_or(QueryPlannerError::InvalidRow)?
{
*distribution = Some(Distribution::Random);
*distribution = Some(Distribution::Any);
}
}
}
......@@ -378,7 +352,7 @@ impl Plan {
.get_mut(row_node)
.ok_or(QueryPlannerError::InvalidRow)?
{
*distribution = Some(Distribution::new_union(left_dist, right_dist)?);
*distribution = Some(Distribution::union(&left_dist, &right_dist));
}
} else if is_join {
if let Node::Expression(Expression::Row {
......@@ -390,7 +364,7 @@ impl Plan {
.get_mut(row_node)
.ok_or(QueryPlannerError::InvalidRow)?
{
*distribution = Some(Distribution::new_join(left_dist, right_dist)?);
*distribution = Some(Distribution::join(&left_dist, right_dist));
}
} else {
return Err(QueryPlannerError::InvalidNode);
......
......@@ -118,19 +118,18 @@ fn proj_shrink_dist_key_1() {
let proj_output = 14;
plan.set_distribution(scan_output).unwrap();
if let Node::Expression(scan_row) = plan.get_node(scan_output).unwrap() {
assert_eq!(
&Distribution::Segment {
keys: collection! { Key::new(vec![1, 0]) }
},
scan_row.distribution().unwrap()
);
}
assert_eq!(
&Distribution::Segment {
keys: collection! { Key::new(vec![1, 0]) }
},
plan.get_distribution(scan_output).unwrap()
);
plan.set_distribution(proj_output).unwrap();
if let Node::Expression(proj_row) = plan.get_node(proj_output).unwrap() {
assert_eq!(&Distribution::Random, proj_row.distribution().unwrap());
}
assert_eq!(
&Distribution::Any,
plan.get_distribution(proj_output).unwrap()
);
}
#[test]
......@@ -152,19 +151,18 @@ fn proj_shrink_dist_key_2() {
let proj_output = 12;
plan.set_distribution(scan_output).unwrap();
if let Node::Expression(scan_row) = plan.get_node(scan_output).unwrap() {
assert_eq!(
&Distribution::Segment {
keys: collection! { Key::new(vec![1, 0]) }
},
scan_row.distribution().unwrap()
);
}
assert_eq!(
&Distribution::Segment {
keys: collection! { Key::new(vec![1, 0]) }
},
plan.get_distribution(scan_output).unwrap()
);
plan.set_distribution(proj_output).unwrap();
if let Node::Expression(proj_row) = plan.get_node(proj_output).unwrap() {
assert_eq!(&Distribution::Random, proj_row.distribution().unwrap());
}
assert_eq!(
&Distribution::Any,
plan.get_distribution(proj_output).unwrap()
);
}
#[test]
......@@ -188,29 +186,26 @@ fn union_all_fallback_to_random() {
let union_output = 16;
plan.set_distribution(scan_t1_output).unwrap();
if let Node::Expression(scan_row) = plan.get_node(scan_t1_output).unwrap() {
assert_eq!(
&Distribution::Segment {
keys: collection! { Key::new(vec![0]) }
},
scan_row.distribution().unwrap()
);
}
assert_eq!(
&Distribution::Segment {
keys: collection! { Key::new(vec![0]) }
},
plan.get_distribution(scan_t1_output).unwrap()
);
plan.set_distribution(scan_t2_output).unwrap();
if let Node::Expression(scan_row) = plan.get_node(scan_t2_output).unwrap() {
assert_eq!(
&Distribution::Segment {
keys: collection! { Key::new(vec![1]) }
},
scan_row.distribution().unwrap()
);
}
assert_eq!(
&Distribution::Segment {
keys: collection! { Key::new(vec![1]) }
},
plan.get_distribution(scan_t2_output).unwrap()
);
plan.set_distribution(union_output).unwrap();
if let Node::Expression(scan_row) = plan.get_node(union_output).unwrap() {
assert_eq!(&Distribution::Random, scan_row.distribution().unwrap());
}
assert_eq!(
&Distribution::Any,
plan.get_distribution(union_output).unwrap()
);
}
#[test]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment