Skip to content
Snippets Groups Projects
Verified Commit 8507c789 authored by Denis Smirnov's avatar Denis Smirnov
Browse files

fix: bucket discovery for local motions


Previously, during bucket discovery, we used Buckets::Any for
Motion(Local) nodes. This caused DML queries to be executed on all
nodes instead of targeting specific bucket children. We now apply
Motion(Local) only in the following cases:

- update/delete. When materializing the reading subtree for DML
  operations, Buckets::Any was used, but the reason for this is
  unclear.
- union all between sharded and local tables. To prevent duplicates,
  we materialize the global subtree only on a single storage node.
  Consequently, the subtree with Motion(Local) must have the same
  buckets as its child (the child node will always have Buckets::Any).

Co-authored-by: default avatarArseniy Volynets <a.volynets@picodata.io>
parent d5aa241e
No related branches found
No related tags found
1 merge request!1414sbroad import
......@@ -335,7 +335,7 @@ where
output,
..
}) => match policy {
MotionPolicy::Full | MotionPolicy::Local => {
MotionPolicy::Full => {
self.bucket_map.insert(*output, Buckets::Any);
}
MotionPolicy::Segment(_) => {
......@@ -348,6 +348,21 @@ where
self.bucket_map
.insert(*output, Buckets::new_filtered(buckets));
}
MotionPolicy::Local => {
let child_id = ir_plan.get_relational_child(node_id, 0)?;
let child_buckets = self
.bucket_map
.get(&ir_plan.get_relational_output(child_id)?)
.ok_or_else(|| {
SbroadError::FailedTo(
Action::Retrieve,
Some(Entity::Buckets),
"of the child from the bucket map.".to_smolstr(),
)
})?
.clone();
self.bucket_map.insert(*output, child_buckets);
}
MotionPolicy::LocalSegment(_) => {
// See `dispatch` method in `src/executor.rs` in order to understand when
// `virtual_table` materialized and when not for LocalSegment.
......
use pretty_assertions::assert_eq;
use std::collections::HashSet;
use crate::collection;
use crate::executor::bucket::Buckets;
use crate::executor::engine::mock::RouterRuntimeMock;
use crate::executor::engine::Vshard;
use crate::executor::Query;
use crate::ir::helpers::RepeatableState;
use crate::ir::value::Value;
......@@ -460,3 +460,29 @@ fn global_tbl_groupby() {
assert_eq!(Buckets::Any, buckets);
}
#[test]
fn update_local() {
let query = r#"update t set c = 1 where (a, b) = (1, 1)"#;
let coordinator = RouterRuntimeMock::new();
let mut query = Query::new(&coordinator, query, vec![]).unwrap();
let plan = query.exec_plan.get_ir_plan();
let top = plan.get_top().unwrap();
let buckets = query.bucket_discovery(top).unwrap();
assert_eq!(Buckets::Filtered(collection!(6691)), buckets);
}
#[test]
fn delete_local() {
let query = r#"delete from t where (a, b) = (1, 1)"#;
let coordinator = RouterRuntimeMock::new();
let mut query = Query::new(&coordinator, query, vec![]).unwrap();
let plan = query.exec_plan.get_ir_plan();
let top = plan.get_top().unwrap();
let buckets = query.bucket_discovery(top).unwrap();
assert_eq!(Buckets::Filtered(collection!(6691)), buckets);
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment