diff --git a/sbroad-core/src/frontend/sql.rs b/sbroad-core/src/frontend/sql.rs index 9ae2f5238aecb68bccb5cac57bf8fd58de623792..a52babb146a3a8c1fcc4722033ee8ba0a4a4013d 100644 --- a/sbroad-core/src/frontend/sql.rs +++ b/sbroad-core/src/frontend/sql.rs @@ -1562,6 +1562,7 @@ impl Ast for AbstractSyntaxTree { map.add(id, plan_union_all_id); } Type::ValuesRow => { + // TODO(ars): check that all row elements are constants let ast_child_id = node.children.first().ok_or_else(|| { SbroadError::UnexpectedNumberOfValues("Values Row has no children.".into()) })?; diff --git a/sbroad-core/src/ir/distribution.rs b/sbroad-core/src/ir/distribution.rs index fdf54877f3b9642bef0c29e284b79eac0d8beb57..b2e2f867b39ef7b593ec807f632d7d56624ffead 100644 --- a/sbroad-core/src/ir/distribution.rs +++ b/sbroad-core/src/ir/distribution.rs @@ -55,7 +55,7 @@ impl TryFrom<&MotionKey> for KeySet { Action::Create, Some(Entity::DistributionKey), format!("found value target in motion key: {v}"), - )) + )); } } } @@ -89,25 +89,28 @@ impl From<HashSet<Key, RepeatableState>> for KeySet { /// Tuple distribution in the cluster. #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] pub enum Distribution { - /// A tuple can be located on any data node. + /// The output of relational operator with this distribution + /// can be located on several storages (maybe zero or one). /// Example: projection removes the segment key columns. Any, - /// A tuple is located on all data nodes and on coordinator - /// (constants). + /// The output of relational operator with this distribution + /// can be located on several storages (maybe zero or one). + /// But if the data is present on the node, it is located + /// as if it is a sharded by any of the keys in the keyset. /// - /// **Note**: the same data may appear with different rows number on different storages. - /// E.g. when we execute `select 1 from "t"`, where "t" is sharded on two storages (s1 contains - /// 1 row and s2 contains 2 rows), we will get upper Projection output row with distribution - /// `Replicated`, but on different storages it will have different number of rows. - Replicated, - /// Tuple distribution is set by the distribution key. /// Example: tuples from the segmented table. Segment { /// A set of distribution keys (we can have multiple keys after join) keys: KeySet, }, - /// A tuple is located exactly only on one node + /// A subtree with relational operator that has this distribution is guaranteed + /// to be executed on a single node. Single, + /// If subtree which top has `Global` distribution is executed on several nodes, + /// then on each node output table will be exactly the same table. + /// + /// Example: scan of global tables, motion with policy full. + Global, } impl Distribution { @@ -118,14 +121,16 @@ impl Distribution { right: &Distribution, ) -> Result<Distribution, SbroadError> { let dist = match (left, right) { + (Distribution::Single, Distribution::Global) + | (Distribution::Global, Distribution::Single) => Distribution::Single, (Distribution::Single, _) | (_, Distribution::Single) => { - return Err(SbroadError::Invalid( - Entity::Distribution, - Some(format!("union/except child has unexpected distribution Single. Left: {left:?}, right: {right:?}")))) + return Err(SbroadError::Invalid( + Entity::Distribution, + Some(format!("union/except child has unexpected distribution Single. Left: {left:?}, right: {right:?}")))); } (Distribution::Any, _) | (_, Distribution::Any) => Distribution::Any, - (Distribution::Replicated, _) | (_, Distribution::Replicated) => { - Distribution::Replicated + (Distribution::Global, _) | (_, Distribution::Global) => { + unimplemented!() } ( Distribution::Segment { @@ -155,15 +160,15 @@ impl Distribution { (Distribution::Single, _) | (_, Distribution::Single) => { return Err(SbroadError::Invalid( Entity::Distribution, - Some(format!("join child has unexpected distribution Single. Left: {left:?}, right: {right:?}")))) + Some(format!("join child has unexpected distribution Single. Left: {left:?}, right: {right:?}")))); } - (Distribution::Any, Distribution::Any | Distribution::Replicated) - | (Distribution::Replicated, Distribution::Any) => Distribution::Any, - (Distribution::Replicated, Distribution::Replicated) => Distribution::Replicated, - (Distribution::Any | Distribution::Replicated, Distribution::Segment { .. }) => { + (Distribution::Any, Distribution::Any) => Distribution::Any, + // Currently Global distribution is possible only from + // Motion node, that appeared after conflict resolution. + (Distribution::Global, _) | (Distribution::Any, Distribution::Segment { .. }) => { right.clone() } - (Distribution::Segment { .. }, Distribution::Any | Distribution::Replicated) => { + (_, Distribution::Global) | (Distribution::Segment { .. }, Distribution::Any) => { left.clone() } ( @@ -310,25 +315,16 @@ impl Plan { let mut parent_node = None; let mut only_compound_exprs = true; - let mut contains_non_const_expr = false; for id in row_children.iter() { let child_id = row_child_id(*id)?; - match self.get_expression_node(child_id)? { - Expression::Reference { parent, .. } => { - parent_node = *parent; - only_compound_exprs = false; - break; - } - Expression::Constant { .. } => { - only_compound_exprs = false; - } - _ => { - contains_non_const_expr = true; - } + if let Expression::Reference { parent, .. } = self.get_expression_node(child_id)? { + parent_node = *parent; + only_compound_exprs = false; + break; } } - // if node's output consists ONLY of non-const expressions, + // if node's output consists ONLY of compound expressions, // we can't make any assumptions about its distribution. // e.g select a + b from t // Here Projection must have Distribution::Any @@ -342,16 +338,7 @@ impl Plan { let parent_id: usize = if let Some(parent_id) = parent_node { parent_id } else { - if contains_non_const_expr { - // Row does not contain standalone references, but - // contains some non-const expression: - // select a+b, 1 from t - self.set_dist(row_id, Distribution::Any)?; - } else { - // All children are constants: - // select 1 from t - self.set_const_dist(row_id)?; - } + self.set_dist(row_id, Distribution::Any)?; return Ok(()); }; let parent = self.get_relation_node(parent_id)?; @@ -389,25 +376,6 @@ impl Plan { } } } - // The parent node is VALUES. - if let Relational::Values { .. } = parent { - let mut dist = Distribution::Replicated; - for child_id in ref_nodes { - let right_dist = self.dist_from_child(child_id, &ref_map)?; - dist = Distribution::union_except(&dist, &right_dist)?; - } - let output = self.get_mut_expression_node(row_id)?; - if let Expression::Row { - ref mut distribution, - .. - } = output - { - if distribution.is_none() { - *distribution = Some(dist); - } - } - return Ok(()); - } match ref_nodes { ReferredNodes::None => { @@ -495,11 +463,11 @@ impl Plan { return Err(SbroadError::Invalid( Entity::Distribution, Some("distribution is uninitialized".to_string()), - )) + )); } Some(Distribution::Single) => return Ok(Distribution::Single), Some(Distribution::Any) => return Ok(Distribution::Any), - Some(Distribution::Replicated) => return Ok(Distribution::Replicated), + Some(Distribution::Global) => return Ok(Distribution::Global), Some(Distribution::Segment { keys }) => { let mut new_keys: HashSet<Key, RepeatableState> = HashSet::with_hasher(RepeatableState); @@ -534,14 +502,6 @@ impl Plan { Err(SbroadError::Invalid(Entity::Relational, None)) } - /// Sets row distribution to replicated. - /// - /// # Errors - /// - Node is not of a row type. - pub fn set_const_dist(&mut self, row_id: usize) -> Result<(), SbroadError> { - self.set_dist(row_id, Distribution::Replicated) - } - /// Sets the `Distribution` of row to given one /// /// # Errors @@ -613,7 +573,7 @@ impl Plan { return Err(SbroadError::Invalid( Entity::Relational, Some("expected Except, UnionAll or InnerJoin".to_string()), - )) + )); } }; let expr = self.get_mut_expression_node(row_id)?; diff --git a/sbroad-core/src/ir/operator.rs b/sbroad-core/src/ir/operator.rs index 0eb67973747f21234e5785c4a991c793f7ebf029..19d70102da16b7d14b5570491768c58519524b9a 100644 --- a/sbroad-core/src/ir/operator.rs +++ b/sbroad-core/src/ir/operator.rs @@ -1351,7 +1351,7 @@ impl Plan { } } MotionPolicy::Full => { - self.set_const_dist(output)?; + self.set_dist(output, Distribution::Global)?; } MotionPolicy::Local => { self.set_dist(output, Distribution::Any)?; diff --git a/sbroad-core/src/ir/transformation/redistribution.rs b/sbroad-core/src/ir/transformation/redistribution.rs index 22947835294dda872bc5f662b8e298d15b6e6cdf..ec6e9409812c4dfbd213994020ac815ef6137d1d 100644 --- a/sbroad-core/src/ir/transformation/redistribution.rs +++ b/sbroad-core/src/ir/transformation/redistribution.rs @@ -1299,10 +1299,8 @@ impl Plan { join_kind, ) } - (Distribution::Replicated | Distribution::Any, Distribution::Single) => { - (MotionPolicy::None, MotionPolicy::Full) - } - (Distribution::Single, Distribution::Replicated | Distribution::Any) => { + (Distribution::Any, Distribution::Single) => (MotionPolicy::None, MotionPolicy::Full), + (Distribution::Single, Distribution::Any) => { if let JoinKind::LeftOuter = join_kind { // outer table can't be safely broadcasted in case of LeftJoin see // https://git.picodata.io/picodata/picodata/sbroad/-/issues/248 @@ -1527,7 +1525,7 @@ impl Plan { } } if let Relational::Values { .. } = self.get_relation_node(child_id)? { - if let Distribution::Replicated = child_dist { + if let Distribution::Any = child_dist { map.add_child( child_id, MotionPolicy::LocalSegment(motion_key), @@ -1734,12 +1732,16 @@ impl Plan { // sub queries. Relational::ScanRelation { output, .. } | Relational::ScanSubQuery { output, .. } - | Relational::Values { output, .. } | Relational::GroupBy { output, .. } | Relational::Having { output, .. } | Relational::ValuesRow { output, .. } => { self.set_distribution(output)?; } + Relational::Values { output, .. } => { + // TODO(ars): replace with Global, when it is fully + // supported. + self.set_dist(output, Distribution::Any)?; + } Relational::Projection { output: proj_output_id, .. diff --git a/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_less_for_sub_query.yaml b/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_less_for_sub_query.yaml index e030a59e78dc02d17871d95553bdfe9a55c9e90b..34676b43d5a436cf3e93edfc706e61e8308c3640 100644 --- a/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_less_for_sub_query.yaml +++ b/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_less_for_sub_query.yaml @@ -177,7 +177,7 @@ nodes: Row: list: - 28 - distribution: Replicated + distribution: Global - Relational: Motion: children: diff --git a/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_non_segment_outer_for_sub_query.yaml b/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_non_segment_outer_for_sub_query.yaml index 82dcec732cbfbbdde8332601fe535915a185940f..0f9488b88dedf6ce82aeb15c9468ac8ba6422c8c 100644 --- a/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_non_segment_outer_for_sub_query.yaml +++ b/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_non_segment_outer_for_sub_query.yaml @@ -197,7 +197,7 @@ nodes: Row: list: - 30 - distribution: Replicated + distribution: Global - Relational: Motion: children: diff --git a/sbroad-core/tests/artifactory/ir/transformation/redistribution/multiple_sub_queries.yaml b/sbroad-core/tests/artifactory/ir/transformation/redistribution/multiple_sub_queries.yaml index 91ad623d0241ec3bc584d429e68ae6f53c6242d1..be567d5d4ac3f79d11989dd2d40723dfaf71425d 100644 --- a/sbroad-core/tests/artifactory/ir/transformation/redistribution/multiple_sub_queries.yaml +++ b/sbroad-core/tests/artifactory/ir/transformation/redistribution/multiple_sub_queries.yaml @@ -306,7 +306,7 @@ nodes: Row: list: - 48 - distribution: Replicated + distribution: Global - Relational: Motion: children: