From 8507c789934ded8c00c14e8cfa2ab1e611250039 Mon Sep 17 00:00:00 2001
From: Denis Smirnov <sd@picodata.io>
Date: Mon, 7 Oct 2024 16:50:10 +0700
Subject: [PATCH] fix: bucket discovery for local motions

Previously, during bucket discovery, we used Buckets::Any for
Motion(Local) nodes. This caused DML queries to be executed on all
nodes instead of targeting specific bucket children. We now apply
Motion(Local) only in the following cases:

- update/delete. When materializing the reading subtree for DML
  operations, Buckets::Any was used, but the reason for this is
  unclear.
- union all between sharded and local tables. To prevent duplicates,
  we materialize the global subtree only on a single storage node.
  Consequently, the subtree with Motion(Local) must have the same
  buckets as its child (the child node will always have Buckets::Any).

Co-authored-by: Arseniy Volynets <a.volynets@picodata.io>
---
 sbroad-core/src/executor/bucket.rs       | 17 +++++++++++++-
 sbroad-core/src/executor/bucket/tests.rs | 28 +++++++++++++++++++++++-
 2 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/sbroad-core/src/executor/bucket.rs b/sbroad-core/src/executor/bucket.rs
index 4e385ad7b..0a22474be 100644
--- a/sbroad-core/src/executor/bucket.rs
+++ b/sbroad-core/src/executor/bucket.rs
@@ -335,7 +335,7 @@ where
                     output,
                     ..
                 }) => match policy {
-                    MotionPolicy::Full | MotionPolicy::Local => {
+                    MotionPolicy::Full => {
                         self.bucket_map.insert(*output, Buckets::Any);
                     }
                     MotionPolicy::Segment(_) => {
@@ -348,6 +348,21 @@ where
                         self.bucket_map
                             .insert(*output, Buckets::new_filtered(buckets));
                     }
+                    MotionPolicy::Local => {
+                        let child_id = ir_plan.get_relational_child(node_id, 0)?;
+                        let child_buckets = self
+                            .bucket_map
+                            .get(&ir_plan.get_relational_output(child_id)?)
+                            .ok_or_else(|| {
+                                SbroadError::FailedTo(
+                                    Action::Retrieve,
+                                    Some(Entity::Buckets),
+                                    "of the child from the bucket map.".to_smolstr(),
+                                )
+                            })?
+                            .clone();
+                        self.bucket_map.insert(*output, child_buckets);
+                    }
                     MotionPolicy::LocalSegment(_) => {
                         // See `dispatch` method in `src/executor.rs` in order to understand when
                         // `virtual_table` materialized and when not for LocalSegment.
diff --git a/sbroad-core/src/executor/bucket/tests.rs b/sbroad-core/src/executor/bucket/tests.rs
index 8b43ed12f..3c935089b 100644
--- a/sbroad-core/src/executor/bucket/tests.rs
+++ b/sbroad-core/src/executor/bucket/tests.rs
@@ -1,10 +1,10 @@
 use pretty_assertions::assert_eq;
 use std::collections::HashSet;
 
+use crate::collection;
 use crate::executor::bucket::Buckets;
 use crate::executor::engine::mock::RouterRuntimeMock;
 use crate::executor::engine::Vshard;
-
 use crate::executor::Query;
 use crate::ir::helpers::RepeatableState;
 use crate::ir::value::Value;
@@ -460,3 +460,29 @@ fn global_tbl_groupby() {
 
     assert_eq!(Buckets::Any, buckets);
 }
+
+#[test]
+fn update_local() {
+    let query = r#"update t set c = 1 where (a, b) = (1, 1)"#;
+
+    let coordinator = RouterRuntimeMock::new();
+    let mut query = Query::new(&coordinator, query, vec![]).unwrap();
+    let plan = query.exec_plan.get_ir_plan();
+    let top = plan.get_top().unwrap();
+    let buckets = query.bucket_discovery(top).unwrap();
+
+    assert_eq!(Buckets::Filtered(collection!(6691)), buckets);
+}
+
+#[test]
+fn delete_local() {
+    let query = r#"delete from t where (a, b) = (1, 1)"#;
+
+    let coordinator = RouterRuntimeMock::new();
+    let mut query = Query::new(&coordinator, query, vec![]).unwrap();
+    let plan = query.exec_plan.get_ir_plan();
+    let top = plan.get_top().unwrap();
+    let buckets = query.bucket_discovery(top).unwrap();
+
+    assert_eq!(Buckets::Filtered(collection!(6691)), buckets);
+}
-- 
GitLab