Skip to content
Snippets Groups Projects
Commit 7e58a704 authored by Arseniy Volynets's avatar Arseniy Volynets Committed by Denis Smirnov
Browse files

feat!: replace Replicated with Global

- now Motion(Full) has Global distribution
- Relational::Values has Any distribution
- Replicated distribution was removed, Distribution::Any is
used instead for projection with constants
parent f5f4b973
No related branches found
No related tags found
1 merge request!1414sbroad import
...@@ -1562,6 +1562,7 @@ impl Ast for AbstractSyntaxTree { ...@@ -1562,6 +1562,7 @@ impl Ast for AbstractSyntaxTree {
map.add(id, plan_union_all_id); map.add(id, plan_union_all_id);
} }
Type::ValuesRow => { Type::ValuesRow => {
// TODO(ars): check that all row elements are constants
let ast_child_id = node.children.first().ok_or_else(|| { let ast_child_id = node.children.first().ok_or_else(|| {
SbroadError::UnexpectedNumberOfValues("Values Row has no children.".into()) SbroadError::UnexpectedNumberOfValues("Values Row has no children.".into())
})?; })?;
......
...@@ -55,7 +55,7 @@ impl TryFrom<&MotionKey> for KeySet { ...@@ -55,7 +55,7 @@ impl TryFrom<&MotionKey> for KeySet {
Action::Create, Action::Create,
Some(Entity::DistributionKey), Some(Entity::DistributionKey),
format!("found value target in motion key: {v}"), format!("found value target in motion key: {v}"),
)) ));
} }
} }
} }
...@@ -89,25 +89,28 @@ impl From<HashSet<Key, RepeatableState>> for KeySet { ...@@ -89,25 +89,28 @@ impl From<HashSet<Key, RepeatableState>> for KeySet {
/// Tuple distribution in the cluster. /// Tuple distribution in the cluster.
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub enum Distribution { pub enum Distribution {
/// A tuple can be located on any data node. /// The output of relational operator with this distribution
/// can be located on several storages (maybe zero or one).
/// Example: projection removes the segment key columns. /// Example: projection removes the segment key columns.
Any, Any,
/// A tuple is located on all data nodes and on coordinator /// The output of relational operator with this distribution
/// (constants). /// can be located on several storages (maybe zero or one).
/// But if the data is present on the node, it is located
/// as if it is a sharded by any of the keys in the keyset.
/// ///
/// **Note**: the same data may appear with different rows number on different storages.
/// E.g. when we execute `select 1 from "t"`, where "t" is sharded on two storages (s1 contains
/// 1 row and s2 contains 2 rows), we will get upper Projection output row with distribution
/// `Replicated`, but on different storages it will have different number of rows.
Replicated,
/// Tuple distribution is set by the distribution key.
/// Example: tuples from the segmented table. /// Example: tuples from the segmented table.
Segment { Segment {
/// A set of distribution keys (we can have multiple keys after join) /// A set of distribution keys (we can have multiple keys after join)
keys: KeySet, keys: KeySet,
}, },
/// A tuple is located exactly only on one node /// A subtree with relational operator that has this distribution is guaranteed
/// to be executed on a single node.
Single, Single,
/// If subtree which top has `Global` distribution is executed on several nodes,
/// then on each node output table will be exactly the same table.
///
/// Example: scan of global tables, motion with policy full.
Global,
} }
impl Distribution { impl Distribution {
...@@ -118,14 +121,16 @@ impl Distribution { ...@@ -118,14 +121,16 @@ impl Distribution {
right: &Distribution, right: &Distribution,
) -> Result<Distribution, SbroadError> { ) -> Result<Distribution, SbroadError> {
let dist = match (left, right) { let dist = match (left, right) {
(Distribution::Single, Distribution::Global)
| (Distribution::Global, Distribution::Single) => Distribution::Single,
(Distribution::Single, _) | (_, Distribution::Single) => { (Distribution::Single, _) | (_, Distribution::Single) => {
return Err(SbroadError::Invalid( return Err(SbroadError::Invalid(
Entity::Distribution, Entity::Distribution,
Some(format!("union/except child has unexpected distribution Single. Left: {left:?}, right: {right:?}")))) Some(format!("union/except child has unexpected distribution Single. Left: {left:?}, right: {right:?}"))));
} }
(Distribution::Any, _) | (_, Distribution::Any) => Distribution::Any, (Distribution::Any, _) | (_, Distribution::Any) => Distribution::Any,
(Distribution::Replicated, _) | (_, Distribution::Replicated) => { (Distribution::Global, _) | (_, Distribution::Global) => {
Distribution::Replicated unimplemented!()
} }
( (
Distribution::Segment { Distribution::Segment {
...@@ -155,15 +160,15 @@ impl Distribution { ...@@ -155,15 +160,15 @@ impl Distribution {
(Distribution::Single, _) | (_, Distribution::Single) => { (Distribution::Single, _) | (_, Distribution::Single) => {
return Err(SbroadError::Invalid( return Err(SbroadError::Invalid(
Entity::Distribution, Entity::Distribution,
Some(format!("join child has unexpected distribution Single. Left: {left:?}, right: {right:?}")))) Some(format!("join child has unexpected distribution Single. Left: {left:?}, right: {right:?}"))));
} }
(Distribution::Any, Distribution::Any | Distribution::Replicated) (Distribution::Any, Distribution::Any) => Distribution::Any,
| (Distribution::Replicated, Distribution::Any) => Distribution::Any, // Currently Global distribution is possible only from
(Distribution::Replicated, Distribution::Replicated) => Distribution::Replicated, // Motion node, that appeared after conflict resolution.
(Distribution::Any | Distribution::Replicated, Distribution::Segment { .. }) => { (Distribution::Global, _) | (Distribution::Any, Distribution::Segment { .. }) => {
right.clone() right.clone()
} }
(Distribution::Segment { .. }, Distribution::Any | Distribution::Replicated) => { (_, Distribution::Global) | (Distribution::Segment { .. }, Distribution::Any) => {
left.clone() left.clone()
} }
( (
...@@ -310,25 +315,16 @@ impl Plan { ...@@ -310,25 +315,16 @@ impl Plan {
let mut parent_node = None; let mut parent_node = None;
let mut only_compound_exprs = true; let mut only_compound_exprs = true;
let mut contains_non_const_expr = false;
for id in row_children.iter() { for id in row_children.iter() {
let child_id = row_child_id(*id)?; let child_id = row_child_id(*id)?;
match self.get_expression_node(child_id)? { if let Expression::Reference { parent, .. } = self.get_expression_node(child_id)? {
Expression::Reference { parent, .. } => { parent_node = *parent;
parent_node = *parent; only_compound_exprs = false;
only_compound_exprs = false; break;
break;
}
Expression::Constant { .. } => {
only_compound_exprs = false;
}
_ => {
contains_non_const_expr = true;
}
} }
} }
// if node's output consists ONLY of non-const expressions, // if node's output consists ONLY of compound expressions,
// we can't make any assumptions about its distribution. // we can't make any assumptions about its distribution.
// e.g select a + b from t // e.g select a + b from t
// Here Projection must have Distribution::Any // Here Projection must have Distribution::Any
...@@ -342,16 +338,7 @@ impl Plan { ...@@ -342,16 +338,7 @@ impl Plan {
let parent_id: usize = if let Some(parent_id) = parent_node { let parent_id: usize = if let Some(parent_id) = parent_node {
parent_id parent_id
} else { } else {
if contains_non_const_expr { self.set_dist(row_id, Distribution::Any)?;
// Row does not contain standalone references, but
// contains some non-const expression:
// select a+b, 1 from t
self.set_dist(row_id, Distribution::Any)?;
} else {
// All children are constants:
// select 1 from t
self.set_const_dist(row_id)?;
}
return Ok(()); return Ok(());
}; };
let parent = self.get_relation_node(parent_id)?; let parent = self.get_relation_node(parent_id)?;
...@@ -389,25 +376,6 @@ impl Plan { ...@@ -389,25 +376,6 @@ impl Plan {
} }
} }
} }
// The parent node is VALUES.
if let Relational::Values { .. } = parent {
let mut dist = Distribution::Replicated;
for child_id in ref_nodes {
let right_dist = self.dist_from_child(child_id, &ref_map)?;
dist = Distribution::union_except(&dist, &right_dist)?;
}
let output = self.get_mut_expression_node(row_id)?;
if let Expression::Row {
ref mut distribution,
..
} = output
{
if distribution.is_none() {
*distribution = Some(dist);
}
}
return Ok(());
}
match ref_nodes { match ref_nodes {
ReferredNodes::None => { ReferredNodes::None => {
...@@ -495,11 +463,11 @@ impl Plan { ...@@ -495,11 +463,11 @@ impl Plan {
return Err(SbroadError::Invalid( return Err(SbroadError::Invalid(
Entity::Distribution, Entity::Distribution,
Some("distribution is uninitialized".to_string()), Some("distribution is uninitialized".to_string()),
)) ));
} }
Some(Distribution::Single) => return Ok(Distribution::Single), Some(Distribution::Single) => return Ok(Distribution::Single),
Some(Distribution::Any) => return Ok(Distribution::Any), Some(Distribution::Any) => return Ok(Distribution::Any),
Some(Distribution::Replicated) => return Ok(Distribution::Replicated), Some(Distribution::Global) => return Ok(Distribution::Global),
Some(Distribution::Segment { keys }) => { Some(Distribution::Segment { keys }) => {
let mut new_keys: HashSet<Key, RepeatableState> = let mut new_keys: HashSet<Key, RepeatableState> =
HashSet::with_hasher(RepeatableState); HashSet::with_hasher(RepeatableState);
...@@ -534,14 +502,6 @@ impl Plan { ...@@ -534,14 +502,6 @@ impl Plan {
Err(SbroadError::Invalid(Entity::Relational, None)) Err(SbroadError::Invalid(Entity::Relational, None))
} }
/// Sets row distribution to replicated.
///
/// # Errors
/// - Node is not of a row type.
pub fn set_const_dist(&mut self, row_id: usize) -> Result<(), SbroadError> {
self.set_dist(row_id, Distribution::Replicated)
}
/// Sets the `Distribution` of row to given one /// Sets the `Distribution` of row to given one
/// ///
/// # Errors /// # Errors
...@@ -613,7 +573,7 @@ impl Plan { ...@@ -613,7 +573,7 @@ impl Plan {
return Err(SbroadError::Invalid( return Err(SbroadError::Invalid(
Entity::Relational, Entity::Relational,
Some("expected Except, UnionAll or InnerJoin".to_string()), Some("expected Except, UnionAll or InnerJoin".to_string()),
)) ));
} }
}; };
let expr = self.get_mut_expression_node(row_id)?; let expr = self.get_mut_expression_node(row_id)?;
......
...@@ -1351,7 +1351,7 @@ impl Plan { ...@@ -1351,7 +1351,7 @@ impl Plan {
} }
} }
MotionPolicy::Full => { MotionPolicy::Full => {
self.set_const_dist(output)?; self.set_dist(output, Distribution::Global)?;
} }
MotionPolicy::Local => { MotionPolicy::Local => {
self.set_dist(output, Distribution::Any)?; self.set_dist(output, Distribution::Any)?;
......
...@@ -1299,10 +1299,8 @@ impl Plan { ...@@ -1299,10 +1299,8 @@ impl Plan {
join_kind, join_kind,
) )
} }
(Distribution::Replicated | Distribution::Any, Distribution::Single) => { (Distribution::Any, Distribution::Single) => (MotionPolicy::None, MotionPolicy::Full),
(MotionPolicy::None, MotionPolicy::Full) (Distribution::Single, Distribution::Any) => {
}
(Distribution::Single, Distribution::Replicated | Distribution::Any) => {
if let JoinKind::LeftOuter = join_kind { if let JoinKind::LeftOuter = join_kind {
// outer table can't be safely broadcasted in case of LeftJoin see // outer table can't be safely broadcasted in case of LeftJoin see
// https://git.picodata.io/picodata/picodata/sbroad/-/issues/248 // https://git.picodata.io/picodata/picodata/sbroad/-/issues/248
...@@ -1527,7 +1525,7 @@ impl Plan { ...@@ -1527,7 +1525,7 @@ impl Plan {
} }
} }
if let Relational::Values { .. } = self.get_relation_node(child_id)? { if let Relational::Values { .. } = self.get_relation_node(child_id)? {
if let Distribution::Replicated = child_dist { if let Distribution::Any = child_dist {
map.add_child( map.add_child(
child_id, child_id,
MotionPolicy::LocalSegment(motion_key), MotionPolicy::LocalSegment(motion_key),
...@@ -1734,12 +1732,16 @@ impl Plan { ...@@ -1734,12 +1732,16 @@ impl Plan {
// sub queries. // sub queries.
Relational::ScanRelation { output, .. } Relational::ScanRelation { output, .. }
| Relational::ScanSubQuery { output, .. } | Relational::ScanSubQuery { output, .. }
| Relational::Values { output, .. }
| Relational::GroupBy { output, .. } | Relational::GroupBy { output, .. }
| Relational::Having { output, .. } | Relational::Having { output, .. }
| Relational::ValuesRow { output, .. } => { | Relational::ValuesRow { output, .. } => {
self.set_distribution(output)?; self.set_distribution(output)?;
} }
Relational::Values { output, .. } => {
// TODO(ars): replace with Global, when it is fully
// supported.
self.set_dist(output, Distribution::Any)?;
}
Relational::Projection { Relational::Projection {
output: proj_output_id, output: proj_output_id,
.. ..
......
...@@ -177,7 +177,7 @@ nodes: ...@@ -177,7 +177,7 @@ nodes:
Row: Row:
list: list:
- 28 - 28
distribution: Replicated distribution: Global
- Relational: - Relational:
Motion: Motion:
children: children:
......
...@@ -197,7 +197,7 @@ nodes: ...@@ -197,7 +197,7 @@ nodes:
Row: Row:
list: list:
- 30 - 30
distribution: Replicated distribution: Global
- Relational: - Relational:
Motion: Motion:
children: children:
......
...@@ -306,7 +306,7 @@ nodes: ...@@ -306,7 +306,7 @@ nodes:
Row: Row:
list: list:
- 48 - 48
distribution: Replicated distribution: Global
- Relational: - Relational:
Motion: Motion:
children: children:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment