From 7e58a70483eee8a5aee1930b81e6a87920595f42 Mon Sep 17 00:00:00 2001
From: Arseniy Volynets <vol0ncar@yandex.ru>
Date: Mon, 16 Oct 2023 10:59:15 +0300
Subject: [PATCH] feat!: replace Replicated with Global

- now Motion(Full) has Global distribution
- Relational::Values has Any distribution
- Replicated distribution was removed, Distribution::Any is
used instead for projection with constants
---
 sbroad-core/src/frontend/sql.rs               |   1 +
 sbroad-core/src/ir/distribution.rs            | 112 ++++++------------
 sbroad-core/src/ir/operator.rs                |   2 +-
 .../src/ir/transformation/redistribution.rs   |  14 ++-
 .../full_motion_less_for_sub_query.yaml       |   2 +-
 ...otion_non_segment_outer_for_sub_query.yaml |   2 +-
 .../redistribution/multiple_sub_queries.yaml  |   2 +-
 7 files changed, 49 insertions(+), 86 deletions(-)

diff --git a/sbroad-core/src/frontend/sql.rs b/sbroad-core/src/frontend/sql.rs
index 9ae2f5238a..a52babb146 100644
--- a/sbroad-core/src/frontend/sql.rs
+++ b/sbroad-core/src/frontend/sql.rs
@@ -1562,6 +1562,7 @@ impl Ast for AbstractSyntaxTree {
                     map.add(id, plan_union_all_id);
                 }
                 Type::ValuesRow => {
+                    // TODO(ars): check that all row elements are constants
                     let ast_child_id = node.children.first().ok_or_else(|| {
                         SbroadError::UnexpectedNumberOfValues("Values Row has no children.".into())
                     })?;
diff --git a/sbroad-core/src/ir/distribution.rs b/sbroad-core/src/ir/distribution.rs
index fdf54877f3..b2e2f867b3 100644
--- a/sbroad-core/src/ir/distribution.rs
+++ b/sbroad-core/src/ir/distribution.rs
@@ -55,7 +55,7 @@ impl TryFrom<&MotionKey> for KeySet {
                         Action::Create,
                         Some(Entity::DistributionKey),
                         format!("found value target in motion key: {v}"),
-                    ))
+                    ));
                 }
             }
         }
@@ -89,25 +89,28 @@ impl From<HashSet<Key, RepeatableState>> for KeySet {
 /// Tuple distribution in the cluster.
 #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
 pub enum Distribution {
-    /// A tuple can be located on any data node.
+    /// The output of relational operator with this distribution
+    /// can be located on several storages (maybe zero or one).
     /// Example: projection removes the segment key columns.
     Any,
-    /// A tuple is located on all data nodes and on coordinator
-    /// (constants).
+    /// The output of relational operator with this distribution
+    /// can be located on several storages (maybe zero or one).
+    /// But if the data is present on the node, it is located
+    /// as if it is a sharded by any of the keys in the keyset.
     ///
-    /// **Note**: the same data may appear with different rows number on different storages.
-    /// E.g. when we execute `select 1 from "t"`, where "t" is sharded on two storages (s1 contains
-    /// 1 row and s2 contains 2 rows), we will get upper Projection output row with distribution
-    /// `Replicated`, but on different storages it will have different number of rows.
-    Replicated,
-    /// Tuple distribution is set by the distribution key.
     /// Example: tuples from the segmented table.
     Segment {
         /// A set of distribution keys (we can have multiple keys after join)
         keys: KeySet,
     },
-    /// A tuple is located exactly only on one node
+    /// A subtree with relational operator that has this distribution is guaranteed
+    /// to be executed on a single node.
     Single,
+    /// If subtree which top has `Global` distribution is executed on several nodes,
+    /// then on each node output table will be exactly the same table.
+    ///
+    /// Example: scan of global tables, motion with policy full.
+    Global,
 }
 
 impl Distribution {
@@ -118,14 +121,16 @@ impl Distribution {
         right: &Distribution,
     ) -> Result<Distribution, SbroadError> {
         let dist = match (left, right) {
+            (Distribution::Single, Distribution::Global)
+            | (Distribution::Global, Distribution::Single) => Distribution::Single,
             (Distribution::Single, _) | (_, Distribution::Single) => {
-                 return Err(SbroadError::Invalid(
-                     Entity::Distribution,
-                     Some(format!("union/except child has unexpected distribution Single. Left: {left:?}, right: {right:?}"))))   
+                return Err(SbroadError::Invalid(
+                    Entity::Distribution,
+                    Some(format!("union/except child has unexpected distribution Single. Left: {left:?}, right: {right:?}"))));
             }
             (Distribution::Any, _) | (_, Distribution::Any) => Distribution::Any,
-            (Distribution::Replicated, _) | (_, Distribution::Replicated) => {
-                Distribution::Replicated
+            (Distribution::Global, _) | (_, Distribution::Global) => {
+                unimplemented!()
             }
             (
                 Distribution::Segment {
@@ -155,15 +160,15 @@ impl Distribution {
             (Distribution::Single, _) | (_, Distribution::Single) => {
                 return Err(SbroadError::Invalid(
                     Entity::Distribution,
-                    Some(format!("join child has unexpected distribution Single. Left: {left:?}, right: {right:?}"))))
+                    Some(format!("join child has unexpected distribution Single. Left: {left:?}, right: {right:?}"))));
             }
-            (Distribution::Any, Distribution::Any | Distribution::Replicated)
-            | (Distribution::Replicated, Distribution::Any) => Distribution::Any,
-            (Distribution::Replicated, Distribution::Replicated) => Distribution::Replicated,
-            (Distribution::Any | Distribution::Replicated, Distribution::Segment { .. }) => {
+            (Distribution::Any, Distribution::Any) => Distribution::Any,
+            // Currently Global distribution is possible only from
+            // Motion node, that appeared after conflict resolution.
+            (Distribution::Global, _) | (Distribution::Any, Distribution::Segment { .. }) => {
                 right.clone()
             }
-            (Distribution::Segment { .. }, Distribution::Any | Distribution::Replicated) => {
+            (_, Distribution::Global) | (Distribution::Segment { .. }, Distribution::Any) => {
                 left.clone()
             }
             (
@@ -310,25 +315,16 @@ impl Plan {
 
         let mut parent_node = None;
         let mut only_compound_exprs = true;
-        let mut contains_non_const_expr = false;
         for id in row_children.iter() {
             let child_id = row_child_id(*id)?;
-            match self.get_expression_node(child_id)? {
-                Expression::Reference { parent, .. } => {
-                    parent_node = *parent;
-                    only_compound_exprs = false;
-                    break;
-                }
-                Expression::Constant { .. } => {
-                    only_compound_exprs = false;
-                }
-                _ => {
-                    contains_non_const_expr = true;
-                }
+            if let Expression::Reference { parent, .. } = self.get_expression_node(child_id)? {
+                parent_node = *parent;
+                only_compound_exprs = false;
+                break;
             }
         }
 
-        // if node's output consists ONLY of non-const expressions,
+        // if node's output consists ONLY of compound expressions,
         // we can't make any assumptions about its distribution.
         // e.g select a + b from t
         // Here Projection must have Distribution::Any
@@ -342,16 +338,7 @@ impl Plan {
         let parent_id: usize = if let Some(parent_id) = parent_node {
             parent_id
         } else {
-            if contains_non_const_expr {
-                // Row does not contain standalone references, but
-                // contains some non-const expression:
-                // select a+b, 1 from t
-                self.set_dist(row_id, Distribution::Any)?;
-            } else {
-                // All children are constants:
-                // select 1 from t
-                self.set_const_dist(row_id)?;
-            }
+            self.set_dist(row_id, Distribution::Any)?;
             return Ok(());
         };
         let parent = self.get_relation_node(parent_id)?;
@@ -389,25 +376,6 @@ impl Plan {
                     }
                 }
             }
-            // The parent node is VALUES.
-            if let Relational::Values { .. } = parent {
-                let mut dist = Distribution::Replicated;
-                for child_id in ref_nodes {
-                    let right_dist = self.dist_from_child(child_id, &ref_map)?;
-                    dist = Distribution::union_except(&dist, &right_dist)?;
-                }
-                let output = self.get_mut_expression_node(row_id)?;
-                if let Expression::Row {
-                    ref mut distribution,
-                    ..
-                } = output
-                {
-                    if distribution.is_none() {
-                        *distribution = Some(dist);
-                    }
-                }
-                return Ok(());
-            }
 
             match ref_nodes {
                 ReferredNodes::None => {
@@ -495,11 +463,11 @@ impl Plan {
                         return Err(SbroadError::Invalid(
                             Entity::Distribution,
                             Some("distribution is uninitialized".to_string()),
-                        ))
+                        ));
                     }
                     Some(Distribution::Single) => return Ok(Distribution::Single),
                     Some(Distribution::Any) => return Ok(Distribution::Any),
-                    Some(Distribution::Replicated) => return Ok(Distribution::Replicated),
+                    Some(Distribution::Global) => return Ok(Distribution::Global),
                     Some(Distribution::Segment { keys }) => {
                         let mut new_keys: HashSet<Key, RepeatableState> =
                             HashSet::with_hasher(RepeatableState);
@@ -534,14 +502,6 @@ impl Plan {
         Err(SbroadError::Invalid(Entity::Relational, None))
     }
 
-    /// Sets row distribution to replicated.
-    ///
-    /// # Errors
-    /// - Node is not of a row type.
-    pub fn set_const_dist(&mut self, row_id: usize) -> Result<(), SbroadError> {
-        self.set_dist(row_id, Distribution::Replicated)
-    }
-
     /// Sets the `Distribution` of row to given one
     ///
     /// # Errors
@@ -613,7 +573,7 @@ impl Plan {
                 return Err(SbroadError::Invalid(
                     Entity::Relational,
                     Some("expected Except, UnionAll or InnerJoin".to_string()),
-                ))
+                ));
             }
         };
         let expr = self.get_mut_expression_node(row_id)?;
diff --git a/sbroad-core/src/ir/operator.rs b/sbroad-core/src/ir/operator.rs
index 0eb6797374..19d70102da 100644
--- a/sbroad-core/src/ir/operator.rs
+++ b/sbroad-core/src/ir/operator.rs
@@ -1351,7 +1351,7 @@ impl Plan {
                 }
             }
             MotionPolicy::Full => {
-                self.set_const_dist(output)?;
+                self.set_dist(output, Distribution::Global)?;
             }
             MotionPolicy::Local => {
                 self.set_dist(output, Distribution::Any)?;
diff --git a/sbroad-core/src/ir/transformation/redistribution.rs b/sbroad-core/src/ir/transformation/redistribution.rs
index 2294783529..ec6e940981 100644
--- a/sbroad-core/src/ir/transformation/redistribution.rs
+++ b/sbroad-core/src/ir/transformation/redistribution.rs
@@ -1299,10 +1299,8 @@ impl Plan {
                     join_kind,
                 )
             }
-            (Distribution::Replicated | Distribution::Any, Distribution::Single) => {
-                (MotionPolicy::None, MotionPolicy::Full)
-            }
-            (Distribution::Single, Distribution::Replicated | Distribution::Any) => {
+            (Distribution::Any, Distribution::Single) => (MotionPolicy::None, MotionPolicy::Full),
+            (Distribution::Single, Distribution::Any) => {
                 if let JoinKind::LeftOuter = join_kind {
                     // outer table can't be safely broadcasted in case of LeftJoin see
                     // https://git.picodata.io/picodata/picodata/sbroad/-/issues/248
@@ -1527,7 +1525,7 @@ impl Plan {
             }
         }
         if let Relational::Values { .. } = self.get_relation_node(child_id)? {
-            if let Distribution::Replicated = child_dist {
+            if let Distribution::Any = child_dist {
                 map.add_child(
                     child_id,
                     MotionPolicy::LocalSegment(motion_key),
@@ -1734,12 +1732,16 @@ impl Plan {
                 // sub queries.
                 Relational::ScanRelation { output, .. }
                 | Relational::ScanSubQuery { output, .. }
-                | Relational::Values { output, .. }
                 | Relational::GroupBy { output, .. }
                 | Relational::Having { output, .. }
                 | Relational::ValuesRow { output, .. } => {
                     self.set_distribution(output)?;
                 }
+                Relational::Values { output, .. } => {
+                    // TODO(ars): replace with Global, when it is fully
+                    // supported.
+                    self.set_dist(output, Distribution::Any)?;
+                }
                 Relational::Projection {
                     output: proj_output_id,
                     ..
diff --git a/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_less_for_sub_query.yaml b/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_less_for_sub_query.yaml
index e030a59e78..34676b43d5 100644
--- a/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_less_for_sub_query.yaml
+++ b/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_less_for_sub_query.yaml
@@ -177,7 +177,7 @@ nodes:
         Row:
           list:
             - 28
-          distribution: Replicated
+          distribution: Global
     - Relational:
         Motion:
           children:
diff --git a/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_non_segment_outer_for_sub_query.yaml b/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_non_segment_outer_for_sub_query.yaml
index 82dcec732c..0f9488b88d 100644
--- a/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_non_segment_outer_for_sub_query.yaml
+++ b/sbroad-core/tests/artifactory/ir/transformation/redistribution/full_motion_non_segment_outer_for_sub_query.yaml
@@ -197,7 +197,7 @@ nodes:
         Row:
           list:
             - 30
-          distribution: Replicated
+          distribution: Global
     - Relational:
         Motion:
           children:
diff --git a/sbroad-core/tests/artifactory/ir/transformation/redistribution/multiple_sub_queries.yaml b/sbroad-core/tests/artifactory/ir/transformation/redistribution/multiple_sub_queries.yaml
index 91ad623d02..be567d5d4a 100644
--- a/sbroad-core/tests/artifactory/ir/transformation/redistribution/multiple_sub_queries.yaml
+++ b/sbroad-core/tests/artifactory/ir/transformation/redistribution/multiple_sub_queries.yaml
@@ -306,7 +306,7 @@ nodes:
         Row:
           list:
             - 48
-          distribution: Replicated
+          distribution: Global
     - Relational:
         Motion:
           children:
-- 
GitLab