From 41ee15652d8d3bb252576b0d5b826f7838cb5263 Mon Sep 17 00:00:00 2001
From: Arseniy Volynets <vol0ncar@yandex.ru>
Date: Mon, 30 Jan 2023 16:32:59 +0300
Subject: [PATCH] fix: Filter vtables in case of Buckets::All

Now if the subplan has Buckets::All and have Segmented vtables,
then we will filter vtables
---
 sbroad-cartridge/src/cartridge/router.rs | 174 +++++++++++++----------
 sbroad-core/src/executor/ir.rs           |   6 +
 2 files changed, 105 insertions(+), 75 deletions(-)

diff --git a/sbroad-cartridge/src/cartridge/router.rs b/sbroad-cartridge/src/cartridge/router.rs
index 6c86dfb762..374669aeec 100644
--- a/sbroad-cartridge/src/cartridge/router.rs
+++ b/sbroad-cartridge/src/cartridge/router.rs
@@ -173,6 +173,94 @@ impl Configuration for RouterRuntime {
     }
 }
 
+impl RouterRuntime {
+    fn filter_vtable(&self, plan: &mut ExecutionPlan, bucket_ids: &[u64]) {
+        if let Some(vtables) = plan.get_mut_vtables() {
+            for rc_vtable in vtables.values_mut() {
+                // If the virtual table id hashed by the bucket_id, we can filter its tuples.
+                // Otherwise (full motion policy) we need to preserve all tuples.
+                if !rc_vtable.get_index().is_empty() {
+                    *rc_vtable = Rc::new(rc_vtable.new_with_buckets(bucket_ids));
+                }
+            }
+        }
+    }
+
+    fn encode_plan(&self, exec_plan: ExecutionPlan) -> Result<(Binary, Binary), SbroadError> {
+        // We should not use the cache on the storage if the plan contains virtual tables,
+        // as they can contain different amount of tuples that are not taken into account
+        // when calculating the cache key.
+        let can_be_cached = exec_plan.vtables_empty();
+        let sub_plan_id = exec_plan.get_ir_plan().pattern_id()?;
+        let sp_top_id = exec_plan.get_ir_plan().get_top()?;
+        let sp = SyntaxPlan::new(&exec_plan, sp_top_id, Snapshot::Oldest)?;
+        let ordered = OrderedSyntaxNodes::try_from(sp)?;
+        let nodes = ordered.to_syntax_data()?;
+        // Virtual tables in the plan must be already filtered, so we can use all buckets here.
+        let params = exec_plan.to_params(&nodes, &Buckets::All)?;
+        let query_type = exec_plan.query_type()?;
+        let required_data = RequiredData::new(sub_plan_id, params, query_type, can_be_cached);
+        let encoded_required_data = EncodedRequiredData::try_from(required_data)?;
+        let raw_required_data: Vec<u8> = encoded_required_data.into();
+        let optional_data = OptionalData::new(exec_plan, ordered);
+        let encoded_optional_data = EncodedOptionalData::try_from(optional_data)?;
+        let raw_optional_data: Vec<u8> = encoded_optional_data.into();
+        Ok((raw_required_data.into(), raw_optional_data.into()))
+    }
+
+    fn exec_with_filtered_buckets(
+        &self,
+        mut sub_plan: ExecutionPlan,
+        buckets: &Buckets,
+    ) -> Result<Box<dyn Any>, SbroadError> {
+        let query_type = sub_plan.query_type()?;
+        let conn_type = sub_plan.connection_type()?;
+        let bucket_set = if let Buckets::Filtered(bucket_set) = buckets {
+            bucket_set
+        } else {
+            return Err(SbroadError::Invalid(
+                Entity::Buckets,
+                Some(format!("Expected Buckets::Filtered, got {buckets:?}")),
+            ));
+        };
+        let random_bucket = self.get_random_bucket();
+        let buckets = if bucket_set.is_empty() {
+            // There are no buckets to execute the query on.
+            // At the moment we don't keep types inside our IR tree and
+            // there is no easy way to get column types in the result.
+            // So we just choose a random bucket and to execute the query on,
+            // as we are sure that any bucket returns an empty result.
+
+            // TODO: return an empty result without actual execution.
+            &random_bucket
+        } else {
+            buckets
+        };
+
+        let mut rs_ir: HashMap<String, Message> = HashMap::new();
+        let rs_bucket_vec: Vec<(String, Vec<u64>)> = group(buckets)?.drain().collect();
+        if rs_bucket_vec.is_empty() {
+            return Err(SbroadError::UnexpectedNumberOfValues(format!(
+                "no replica sets were found for the buckets {buckets:?} to execute the query on"
+            )));
+        }
+        rs_ir.reserve(rs_bucket_vec.len());
+
+        if let Some((last, other)) = rs_bucket_vec.split_last() {
+            for (rs, bucket_ids) in other {
+                let mut rs_plan = sub_plan.clone();
+                self.filter_vtable(&mut rs_plan, bucket_ids);
+                rs_ir.insert(rs.clone(), Message::from(self.encode_plan(rs_plan)?));
+            }
+
+            let (rs, bucket_ids) = last;
+            self.filter_vtable(&mut sub_plan, bucket_ids);
+            rs_ir.insert(rs.clone(), Message::from(self.encode_plan(sub_plan)?));
+        }
+        self.exec_ir_on_some(rs_ir, query_type, conn_type)
+    }
+}
+
 impl Coordinator for RouterRuntime {
     type ParseTree = AbstractSyntaxTree;
     type Cache = LRUCache<String, Plan>;
@@ -200,88 +288,24 @@ impl Coordinator for RouterRuntime {
             Option::from("dispatch"),
             &format!("dispatching plan: {plan:?}")
         );
-        let mut sub_plan = plan.take_subtree(top_id)?;
-        let sub_plan_id = sub_plan.get_ir_plan().pattern_id()?;
+        let sub_plan = plan.take_subtree(top_id)?;
         let query_type = sub_plan.query_type()?;
         let conn_type = sub_plan.connection_type()?;
         debug!(Option::from("dispatch"), &format!("sub plan: {sub_plan:?}"));
 
-        let filter_vtable = |plan: &mut ExecutionPlan, bucket_ids: &[u64]| {
-            if let Some(vtables) = plan.get_mut_vtables() {
-                for rc_vtable in vtables.values_mut() {
-                    // If the virtual table id hashed by the bucket_id, we can filter its tuples.
-                    // Otherwise (full motion policy) we need to preserve all tuples.
-                    if !rc_vtable.get_index().is_empty() {
-                        *rc_vtable = Rc::new(rc_vtable.new_with_buckets(bucket_ids));
-                    }
+        match buckets {
+            Buckets::Filtered(_) => self.exec_with_filtered_buckets(sub_plan, buckets),
+            Buckets::All => {
+                if sub_plan.has_segmented_tables() {
+                    let bucket_set: HashSet<u64, RepeatableState> =
+                        (1..=self.bucket_count as u64).into_iter().collect();
+                    let all_buckets = Buckets::new_filtered(bucket_set);
+                    return self.exec_with_filtered_buckets(sub_plan, &all_buckets);
                 }
+                let (required, optional) = self.encode_plan(sub_plan)?;
+                self.exec_ir_on_all(required, optional, query_type, conn_type)
             }
-        };
-
-        // We should not use the cache on the storage if the plan contains virtual tables,
-        // as they can contain different amount of tuples that are not taken into account
-        // when calculating the cache key.
-        let can_be_cached = plan.vtables_empty();
-
-        let encode_plan = |exec_plan: ExecutionPlan| -> Result<(Binary, Binary), SbroadError> {
-            let sp_top_id = exec_plan.get_ir_plan().get_top()?;
-            let sp = SyntaxPlan::new(&exec_plan, sp_top_id, Snapshot::Oldest)?;
-            let ordered = OrderedSyntaxNodes::try_from(sp)?;
-            let nodes = ordered.to_syntax_data()?;
-            // Virtual tables in the plan must be already filtered, so we can use all buckets here.
-            let params = exec_plan.to_params(&nodes, &Buckets::All)?;
-            let query_type = exec_plan.query_type()?;
-            let required_data =
-                RequiredData::new(sub_plan_id.clone(), params, query_type, can_be_cached);
-            let encoded_required_data = EncodedRequiredData::try_from(required_data)?;
-            let raw_required_data: Vec<u8> = encoded_required_data.into();
-            let optional_data = OptionalData::new(exec_plan, ordered);
-            let encoded_optional_data = EncodedOptionalData::try_from(optional_data)?;
-            let raw_optional_data: Vec<u8> = encoded_optional_data.into();
-            Ok((raw_required_data.into(), raw_optional_data.into()))
-        };
-
-        if let Buckets::Filtered(bucket_set) = buckets {
-            let random_bucket = self.get_random_bucket();
-            let buckets = if bucket_set.is_empty() {
-                // There are no buckets to execute the query on.
-                // At the moment we don't keep types inside our IR tree and
-                // there is no easy way to get column types in the result.
-                // So we just choose a random bucket and to execute the query on,
-                // as we are sure that any bucket returns an empty result.
-
-                // TODO: return an empty result without actual execution.
-                &random_bucket
-            } else {
-                buckets
-            };
-
-            let mut rs_ir: HashMap<String, Message> = HashMap::new();
-            let rs_bucket_vec: Vec<(String, Vec<u64>)> = group(buckets)?.drain().collect();
-            if rs_bucket_vec.is_empty() {
-                return Err(SbroadError::UnexpectedNumberOfValues(format!(
-                    "no replica sets were found for the buckets {:?} to execute the query on",
-                    buckets
-                )));
-            }
-            rs_ir.reserve(rs_bucket_vec.len());
-
-            if let Some((last, other)) = rs_bucket_vec.split_last() {
-                for (rs, bucket_ids) in other {
-                    let mut rs_plan = sub_plan.clone();
-                    filter_vtable(&mut rs_plan, bucket_ids);
-                    rs_ir.insert(rs.clone(), Message::from(encode_plan(rs_plan)?));
-                }
-
-                let (rs, bucket_ids) = last;
-                filter_vtable(&mut sub_plan, bucket_ids);
-                rs_ir.insert(rs.clone(), Message::from(encode_plan(sub_plan)?));
-            }
-            return self.exec_ir_on_some(rs_ir, query_type, conn_type);
         }
-
-        let (required, optional) = encode_plan(sub_plan)?;
-        self.exec_ir_on_all(required, optional, query_type, conn_type)
     }
 
     fn explain_format(&self, explain: String) -> Result<Box<dyn Any>, SbroadError> {
diff --git a/sbroad-core/src/executor/ir.rs b/sbroad-core/src/executor/ir.rs
index e03cdf5871..faefbe7f5d 100644
--- a/sbroad-core/src/executor/ir.rs
+++ b/sbroad-core/src/executor/ir.rs
@@ -95,6 +95,12 @@ impl ExecutionPlan {
         ))
     }
 
+    pub fn has_segmented_tables(&self) -> bool {
+        self.vtables.as_ref().map_or(false, |vtable_map| {
+            vtable_map.map().values().any(|t| !t.get_index().is_empty())
+        })
+    }
+
     /// Extract policy from motion node
     ///
     /// # Errors
-- 
GitLab