From 6c32a532198ff7ab1d83c23159101e3e3d0d0021 Mon Sep 17 00:00:00 2001
From: Denis Smirnov <sd@picodata.io>
Date: Wed, 30 Mar 2022 18:09:08 +0700
Subject: [PATCH] feat: composite keys and executor refactoring

Now we can eliminate buckets with multi-column (i.e. composite)
keys in equalities. Previous implementation was incorrect, so
we have to make an executor refactoring as well.
---
 src/executor.rs                               |  94 +++--
 src/executor/bucket.rs                        | 372 ++++++++++++++++++
 src/executor/bucket/tests.rs                  | 118 ++++++
 src/executor/engine.rs                        |   7 +-
 src/executor/engine/cartridge.rs              | 193 ++++-----
 .../engine/cartridge/backend/sql/ir.rs        |   4 +-
 .../engine/cartridge/backend/sql/ir/tests.rs  |  32 +-
 .../engine/cartridge/backend/sql/tree.rs      |  35 +-
 .../cartridge/backend/sql/tree/tests.rs       |  11 +-
 src/executor/engine/cartridge/bucket.rs       |  19 -
 src/executor/engine/cartridge/hash.rs         |  12 +
 .../cartridge/{bucket => hash}/tests.rs       |   0
 src/executor/engine/mock.rs                   | 125 +++---
 src/executor/ir.rs                            |  19 +-
 src/executor/shard/tests.rs                   | 106 -----
 src/executor/tests.rs                         | 150 +++----
 src/executor/vtable.rs                        |   4 +-
 src/frontend/sql/ir/tests.rs                  |  32 +-
 src/ir/transformation/bool_in/tests.rs        |  12 +-
 src/ir/transformation/dnf.rs                  |  25 +-
 src/ir/transformation/dnf/tests.rs            |  20 +-
 src/ir/transformation/equality_propagation.rs |   3 -
 src/ir/transformation/merge_tuples/tests.rs   |  20 +-
 src/ir/transformation/split_columns/tests.rs  |  16 +-
 src/ir/value.rs                               |  11 +
 src/parser.rs                                 |   4 -
 26 files changed, 916 insertions(+), 528 deletions(-)
 create mode 100644 src/executor/bucket.rs
 create mode 100644 src/executor/bucket/tests.rs
 delete mode 100644 src/executor/engine/cartridge/bucket.rs
 create mode 100644 src/executor/engine/cartridge/hash.rs
 rename src/executor/engine/cartridge/{bucket => hash}/tests.rs (100%)
 delete mode 100644 src/executor/shard/tests.rs

diff --git a/src/executor.rs b/src/executor.rs
index f7bf0d0b1f..4c22049914 100644
--- a/src/executor.rs
+++ b/src/executor.rs
@@ -1,86 +1,98 @@
 use crate::errors::QueryPlannerError;
+use crate::executor::bucket::Buckets;
 use crate::executor::engine::Engine;
 pub use crate::executor::engine::Metadata;
 use crate::executor::ir::ExecutionPlan;
 use crate::executor::result::BoxExecuteFormat;
 use crate::frontend::sql::ast::AbstractSyntaxTree;
 use crate::ir::Plan;
+use std::collections::HashMap;
 
+mod bucket;
 pub mod engine;
 pub(crate) mod ir;
 mod result;
-mod shard;
 mod vtable;
 
+impl Plan {
+    /// Apply optimization rules to the plan.
+    fn optimize(&mut self) -> Result<(), QueryPlannerError> {
+        self.replace_in_operator()?;
+        self.split_columns()?;
+        self.set_dnf()?;
+        // TODO: make it a plan method and rename to "derive_equalities()".
+        self.nodes.add_new_equalities()?;
+        self.merge_tuples()?;
+        self.add_motions()?;
+        Ok(())
+    }
+}
+
 /// Query object for executing
-#[allow(dead_code)]
 pub struct Query<T>
 where
     T: Engine,
 {
-    /// Query IR
-    plan: Plan,
-    /// Execute engine object
+    /// Execution plan
+    exec_plan: ExecutionPlan,
+    /// Execution engine
     engine: T,
+    /// Bucket map
+    bucket_map: HashMap<usize, Buckets>,
 }
 
-#[allow(dead_code)]
 impl<T> Query<T>
 where
     T: Engine,
 {
-    /// Create query object
+    /// Create a new query.
     ///
     /// # Errors
-    /// - query isn't valid or not support yet.
+    /// - Failed to parse SQL.
+    /// - Failed to build AST.
+    /// - Failed to build IR plan.
+    /// - Failed to apply optimizing transformations to IR plan.
     pub fn new(engine: T, sql: &str) -> Result<Self, QueryPlannerError>
     where
         T::Metadata: Metadata,
     {
         let ast = AbstractSyntaxTree::new(sql)?;
-        Ok(Query {
-            plan: ast.to_ir(&engine.metadata())?,
+        let mut plan = ast.to_ir(engine.metadata())?;
+        plan.optimize()?;
+        let query = Query {
+            exec_plan: ExecutionPlan::from(plan),
             engine,
-        })
+            bucket_map: HashMap::new(),
+        };
+        Ok(query)
     }
 
-    /// Execute query in cluster
+    /// Execute distributed query.
     ///
     /// # Errors
-    /// - query can't be executed in cluster
-    /// - invalid bucket id
-    pub fn exec(&self) -> Result<BoxExecuteFormat, QueryPlannerError> {
-        let mut exec_plan = ExecutionPlan::from(&self.plan);
-
-        let ir_plan = exec_plan.get_ir_plan();
-        if let Some(slices) = ir_plan.get_slices() {
-            for motion_level in slices {
-                for motion_id in motion_level {
+    /// - Failed to get a motion subtree.
+    /// - Failed to discover buckets.
+    /// - Failed to materialize motion result and build a virtual table.
+    /// - Failed to get plan top.
+    pub fn exec(&mut self) -> Result<BoxExecuteFormat, QueryPlannerError> {
+        let slices = self.exec_plan.get_ir_plan().get_slices();
+        if let Some(slices) = slices {
+            for slice in slices {
+                for motion_id in slice {
                     // TODO: make it work in parallel
-                    let vtable = self.engine.materialize_motion(&mut exec_plan, motion_id)?;
-                    exec_plan.add_motion_result(motion_id, vtable)?;
+                    let top_id = self.exec_plan.get_motion_subtree_root(motion_id)?;
+                    let buckets = self.bucket_discovery(top_id)?;
+                    let virtual_table =
+                        self.engine
+                            .materialize_motion(&mut self.exec_plan, motion_id, &buckets)?;
+                    self.exec_plan.add_motion_result(motion_id, virtual_table)?;
                 }
             }
         }
 
-        let top = exec_plan.get_ir_plan().get_top()?;
-
-        self.engine.exec(&mut exec_plan, top)
-    }
-
-    /// Apply optimize rules
-    ///
-    /// # Errors
-    /// - transformation can't be applied
-    pub fn optimize(&mut self) -> Result<(), QueryPlannerError> {
-        self.plan.replace_in_operator()?;
-        self.plan.split_columns()?;
-        self.plan.set_dnf()?;
-        // TODO: make it a plan method and rename to "derive_equalities()".
-        self.plan.nodes.add_new_equalities()?;
-        self.plan.merge_tuples()?;
-        self.plan.add_motions()?;
-        Ok(())
+        let top_id = self.exec_plan.get_ir_plan().get_top()?;
+        let buckets = self.bucket_discovery(top_id)?;
+        self.engine.exec(&mut self.exec_plan, top_id, &buckets)
     }
 }
 
diff --git a/src/executor/bucket.rs b/src/executor/bucket.rs
new file mode 100644
index 0000000000..1aa9d752db
--- /dev/null
+++ b/src/executor/bucket.rs
@@ -0,0 +1,372 @@
+use crate::errors::QueryPlannerError;
+use crate::executor::engine::Engine;
+use crate::executor::Query;
+use crate::ir::distribution::Distribution;
+use crate::ir::expression::Expression;
+use crate::ir::operator::{Bool, Relational};
+use crate::ir::transformation::redistribution::MotionPolicy;
+use std::collections::HashSet;
+use traversal::DftPost;
+
+/// Buckets are used to determine which nodes to send the query to.
+#[derive(Clone, Debug, PartialEq)]
+pub enum Buckets {
+    // We don't want to keep thousands of buckets in memory
+    // so we use a special enum to represent all the buckets
+    // in a cluster.
+    All,
+    // A filtered set of buckets.
+    Filtered(HashSet<u64>),
+}
+
+impl Buckets {
+    /// Get all buckets in the cluster.
+    pub fn new_all() -> Self {
+        Buckets::All
+    }
+
+    /// Get a filtered set of buckets.
+    pub fn new_filtered(buckets: HashSet<u64>) -> Self {
+        Buckets::Filtered(buckets)
+    }
+
+    /// Disjunction of two sets of buckets.
+    pub fn disjunct(&self, buckets: &Buckets) -> Buckets {
+        match (self, buckets) {
+            (Buckets::All, Buckets::All) => Buckets::All,
+            (Buckets::Filtered(b), Buckets::All) | (Buckets::All, Buckets::Filtered(b)) => {
+                Buckets::Filtered(b.clone())
+            }
+            (Buckets::Filtered(a), Buckets::Filtered(b)) => {
+                Buckets::Filtered(a.intersection(b).copied().collect())
+            }
+        }
+    }
+
+    /// Conjunction of two sets of buckets.
+    pub fn conjunct(&self, buckets: &Buckets) -> Buckets {
+        match (self, buckets) {
+            (Buckets::All, _) | (_, Buckets::All) => Buckets::All,
+            (Buckets::Filtered(a), Buckets::Filtered(b)) => {
+                Buckets::Filtered(a.union(b).copied().collect())
+            }
+        }
+    }
+}
+
+impl<T> Query<T>
+where
+    T: Engine,
+{
+    fn get_buckets_from_expr(&self, expr_id: usize) -> Result<Buckets, QueryPlannerError> {
+        let mut buckets: Vec<Buckets> = Vec::new();
+        let ir_plan = self.exec_plan.get_ir_plan();
+        let expr = ir_plan.get_expression_node(expr_id)?;
+        if let Expression::Bool {
+            op: Bool::Eq | Bool::In,
+            left,
+            right,
+            ..
+        } = expr
+        {
+            let pairs = vec![(*left, *right), (*right, *left)];
+            for (left_id, right_id) in pairs {
+                let left_expr = ir_plan.get_expression_node(left_id)?;
+                if !left_expr.is_row() {
+                    return Err(QueryPlannerError::CustomError(format!(
+                        "Left side of equality expression is not a row: {:?}",
+                        left_expr
+                    )));
+                }
+                let right_expr = ir_plan.get_expression_node(right_id)?;
+                let right_columns = if let Expression::Row { list, .. } = right_expr {
+                    list.clone()
+                } else {
+                    return Err(QueryPlannerError::CustomError(format!(
+                        "Right side of equality expression is not a row: {:?}",
+                        right_expr
+                    )));
+                };
+
+                // Get the distribution of the left row.
+                let left_dist = ir_plan.get_distribution(left_id)?;
+
+                // Gather buckets from the right row.
+                if let Distribution::Segment { keys } = left_dist {
+                    // If the right side is a row referencing to the motion
+                    // it means that the corresponding virtual table contains
+                    // tuple with the same distribution as the left side.
+                    if let Some(motion_id) = ir_plan.get_motion_from_row(right_id)? {
+                        let virtual_table = self.exec_plan.get_motion_vtable(motion_id)?;
+                        let hashed_keys = virtual_table.get_tuple_distribution().keys();
+                        let mut bucket_ids: HashSet<u64> = HashSet::new();
+                        for bucket_str in hashed_keys {
+                            bucket_ids.insert(self.engine.determine_bucket_id(bucket_str));
+                        }
+                        if !bucket_ids.is_empty() {
+                            buckets.push(Buckets::new_filtered(bucket_ids));
+                        }
+                    }
+
+                    // The right side is a regular row with constants
+                    // on the positions of the left keys (if we are lucky).
+                    for key in keys {
+                        let mut values: Vec<String> = Vec::new();
+                        for position in &key.positions {
+                            let right_column_id =
+                                *right_columns.get(*position).ok_or_else(|| {
+                                    QueryPlannerError::CustomError(format!(
+                                        "Right row does not have column at position {}",
+                                        position
+                                    ))
+                                })?;
+                            let right_column_expr = ir_plan.get_expression_node(right_column_id)?;
+                            if let Expression::Constant { .. } = right_column_expr {
+                                values.push(right_column_expr.get_const_value()?.into());
+                            } else {
+                                // One of the columns is not a constant. Skip this key.
+                                values = Vec::new();
+                                break;
+                            }
+                        }
+                        if !values.is_empty() {
+                            let bucket_str = values.join("");
+                            let bucket = self.engine.determine_bucket_id(&bucket_str);
+                            buckets.push(Buckets::new_filtered([bucket].into()));
+                        }
+                    }
+                }
+            }
+        }
+
+        if buckets.is_empty() {
+            Ok(Buckets::new_all())
+        } else {
+            Ok(buckets
+                .into_iter()
+                .fold(Buckets::new_all(), |a, b| a.disjunct(&b)))
+        }
+    }
+
+    fn get_expression_tree_buckets(&self, expr_id: usize) -> Result<Buckets, QueryPlannerError> {
+        let ir_plan = self.exec_plan.get_ir_plan();
+        let chains = ir_plan.get_dnf_chains(expr_id)?;
+        let mut result: Vec<Buckets> = Vec::new();
+        for mut chain in chains {
+            let mut chain_buckets = Buckets::new_all();
+            let nodes = chain.get_mut_nodes();
+            // Nodes in the chain are in the top-down order (from left tot right).
+            // We need to pop back the chain to get nodes in the bottom-up order.
+            while let Some(node_id) = nodes.pop_back() {
+                let node_buckets = self.get_buckets_from_expr(node_id)?;
+                chain_buckets = chain_buckets.disjunct(&node_buckets);
+            }
+            result.push(chain_buckets);
+        }
+
+        if let Some((first, other)) = result.split_first_mut() {
+            for buckets in other {
+                *first = first.conjunct(buckets);
+            }
+            return Ok(first.clone());
+        }
+
+        Ok(Buckets::All)
+    }
+
+    /// Discover required buckets to execute the query subtree.
+    ///
+    /// # Errors
+    /// - Relational iterator returns non-relational nodes.
+    /// - Failed to find a virtual table.
+    /// - Relational nodes contain invalid children.
+    #[allow(clippy::too_many_lines)]
+    pub fn bucket_discovery(&mut self, top_id: usize) -> Result<Buckets, QueryPlannerError> {
+        let mut nodes: Vec<usize> = Vec::new();
+        let ir_plan = self.exec_plan.get_ir_plan();
+        let rel_tree = DftPost::new(&top_id, |node| ir_plan.nodes.rel_iter(node));
+        for (_, node_id) in rel_tree {
+            nodes.push(*node_id);
+        }
+
+        for node_id in nodes {
+            if self.bucket_map.get(&node_id).is_some() {
+                continue;
+            }
+
+            let rel = self.exec_plan.get_ir_plan().get_relation_node(node_id)?;
+            match rel {
+                Relational::ScanRelation { output, .. } => {
+                    self.bucket_map.insert(*output, Buckets::new_all());
+                }
+                Relational::Motion { policy, output, .. } => match policy {
+                    MotionPolicy::Full => {
+                        self.bucket_map.insert(*output, Buckets::new_all());
+                    }
+                    MotionPolicy::Segment(_) => {
+                        let virtual_table = self.exec_plan.get_motion_vtable(node_id)?;
+                        let mut buckets: HashSet<u64> = HashSet::new();
+                        for key in virtual_table.get_tuple_distribution().keys() {
+                            let bucket = self.engine.determine_bucket_id(key);
+                            buckets.insert(bucket);
+                        }
+                        self.bucket_map
+                            .insert(*output, Buckets::new_filtered(buckets));
+                    }
+                    MotionPolicy::Local => {
+                        return Err(QueryPlannerError::CustomError(
+                            "Local motion policy should never appear in the plan".to_string(),
+                        ));
+                    }
+                },
+                Relational::Projection {
+                    children, output, ..
+                }
+                | Relational::ScanSubQuery {
+                    children, output, ..
+                } => {
+                    let child_id = children.first().ok_or_else(|| {
+                        QueryPlannerError::CustomError(
+                            "Current node should have exactly one child".to_string(),
+                        )
+                    })?;
+                    let child_rel = self.exec_plan.get_ir_plan().get_relation_node(*child_id)?;
+                    let child_buckets = self
+                        .bucket_map
+                        .get(&child_rel.output())
+                        .ok_or_else(|| {
+                            QueryPlannerError::CustomError(
+                                "Failed to retrieve buckets of the child from the bucket map."
+                                    .to_string(),
+                            )
+                        })?
+                        .clone();
+                    self.bucket_map.insert(*output, child_buckets);
+                }
+                Relational::UnionAll {
+                    children, output, ..
+                } => {
+                    if let (Some(first_id), Some(second_id), None) =
+                        (children.first(), children.get(1), children.get(2))
+                    {
+                        let first_rel =
+                            self.exec_plan.get_ir_plan().get_relation_node(*first_id)?;
+                        let second_rel =
+                            self.exec_plan.get_ir_plan().get_relation_node(*second_id)?;
+                        let first_buckets = self.bucket_map.get(&first_rel.output()).ok_or_else(|| {
+                            QueryPlannerError::CustomError(
+                                "Failed to retrieve buckets of the first union all child from the bucket map."
+                                    .to_string(),
+                            )
+                        })?;
+                        let second_buckets = self.bucket_map.get(&second_rel.output()).ok_or_else(|| {
+                            QueryPlannerError::CustomError(
+                                "Failed to retrieve buckets of the second union all child from the bucket map."
+                                    .to_string(),
+                            )
+                        })?;
+                        let buckets = first_buckets.disjunct(second_buckets);
+                        self.bucket_map.insert(*output, buckets);
+                    } else {
+                        return Err(QueryPlannerError::CustomError(
+                            "Current node should have exactly two children".to_string(),
+                        ));
+                    }
+                }
+                Relational::Selection {
+                    children,
+                    filter,
+                    output,
+                    ..
+                } => {
+                    // We need to get the buckets of the child node for the case
+                    // when the filter returns no buckets to reduce.
+                    let child_id = children.first().ok_or_else(|| {
+                        QueryPlannerError::CustomError(
+                            "Current node should have exactly one child".to_string(),
+                        )
+                    })?;
+                    let child_rel = self.exec_plan.get_ir_plan().get_relation_node(*child_id)?;
+                    let child_buckets = self
+                        .bucket_map
+                        .get(&child_rel.output())
+                        .ok_or_else(|| {
+                            QueryPlannerError::CustomError(
+                            "Failed to retrieve buckets of the selection child from the bucket map."
+                                .to_string(),
+                        )
+                        })?
+                        .clone();
+                    let output_id = *output;
+                    let filter_id = *filter;
+                    let filter_buckets = self.get_expression_tree_buckets(filter_id)?;
+                    self.bucket_map
+                        .insert(output_id, child_buckets.disjunct(&filter_buckets));
+                }
+                Relational::InnerJoin {
+                    children,
+                    condition,
+                    output,
+                    ..
+                } => {
+                    if let (Some(inner_id), Some(outer_id)) = (children.first(), children.get(1)) {
+                        let inner_rel =
+                            self.exec_plan.get_ir_plan().get_relation_node(*inner_id)?;
+                        let outer_rel =
+                            self.exec_plan.get_ir_plan().get_relation_node(*outer_id)?;
+                        let inner_buckets = self
+                            .bucket_map
+                            .get(&inner_rel.output())
+                            .ok_or_else(|| {
+                                QueryPlannerError::CustomError(
+                                "Failed to retrieve buckets of the inner child from the bucket map."
+                                    .to_string(),
+                            )
+                            })?
+                            .clone();
+                        let outer_buckets = self
+                            .bucket_map
+                            .get(&outer_rel.output())
+                            .ok_or_else(|| {
+                                QueryPlannerError::CustomError(
+                                "Failed to retrieve buckets of the outer child from the bucket map."
+                                .to_string(),
+                            )
+                            })?
+                            .clone();
+                        let output_id = *output;
+                        let condition_id = *condition;
+                        let filter_buckets = self.get_expression_tree_buckets(condition_id)?;
+                        self.bucket_map.insert(
+                            output_id,
+                            inner_buckets
+                                .conjunct(&outer_buckets)
+                                .disjunct(&filter_buckets),
+                        );
+                    } else {
+                        return Err(QueryPlannerError::CustomError(
+                            "Current node should have at least two children".to_string(),
+                        ));
+                    }
+                }
+            }
+        }
+
+        let top_rel = self.exec_plan.get_ir_plan().get_relation_node(top_id)?;
+        let top_buckets = self
+            .bucket_map
+            .get(&top_rel.output())
+            .ok_or_else(|| {
+                QueryPlannerError::CustomError(
+                    "Failed to retrieve buckets of the top relation from the bucket map."
+                        .to_string(),
+                )
+            })?
+            .clone();
+
+        Ok(top_buckets)
+    }
+}
+
+#[cfg(test)]
+mod tests;
diff --git a/src/executor/bucket/tests.rs b/src/executor/bucket/tests.rs
new file mode 100644
index 0000000000..dc8e0fb01c
--- /dev/null
+++ b/src/executor/bucket/tests.rs
@@ -0,0 +1,118 @@
+use pretty_assertions::assert_eq;
+
+use crate::executor::bucket::Buckets;
+use crate::executor::engine::mock::EngineMock;
+use crate::executor::engine::Engine;
+use crate::executor::Query;
+
+#[test]
+fn simple_union_query() {
+    let query = r#"SELECT * FROM (
+    SELECT * FROM "test_space" WHERE "sysFrom" > 0
+    UNION ALL
+    SELECT * FROM "test_space_hist" WHERE "sysFrom" < 0
+    ) as "t3"
+    WHERE "id" = 1"#;
+
+    let engine = EngineMock::new();
+    let mut query = Query::new(engine, query).unwrap();
+    let plan = query.exec_plan.get_ir_plan();
+    let top = plan.get_top().unwrap();
+    let buckets = query.bucket_discovery(top).unwrap();
+
+    let bucket1 = query.engine.determine_bucket_id("1");
+    let expected = Buckets::new_filtered([bucket1].into());
+
+    assert_eq!(expected, buckets);
+}
+
+#[test]
+fn simple_disjunction_in_union_query() {
+    let query = r#"SELECT * FROM (
+    SELECT * FROM "test_space" WHERE "sysFrom" > 0
+    UNION ALL
+    SELECT * FROM "test_space_hist" WHERE "sysFrom" < 0
+    ) as "t3"
+    WHERE ("id" = 1) OR ("id" = 100)"#;
+
+    let engine = EngineMock::new();
+    let mut query = Query::new(engine, query).unwrap();
+    let plan = query.exec_plan.get_ir_plan();
+    let top = plan.get_top().unwrap();
+    let buckets = query.bucket_discovery(top).unwrap();
+
+    let bucket1 = query.engine.determine_bucket_id("1");
+    let bucket100 = query.engine.determine_bucket_id("100");
+    let expected = Buckets::new_filtered([bucket1, bucket100].into());
+
+    assert_eq!(expected, buckets);
+}
+
+#[test]
+fn complex_shard_key_union_query() {
+    let query = r#"SELECT *
+    FROM
+        (SELECT "identification_number", "product_code"
+        FROM "hash_testing"
+        WHERE "sys_op" = 1
+        UNION ALL
+        SELECT "identification_number", "product_code"
+        FROM "hash_testing_hist"
+        WHERE "sys_op" > 1) AS "t3"
+    WHERE "identification_number" = 1 AND "product_code" = '222'"#;
+
+    let engine = EngineMock::new();
+    let mut query = Query::new(engine, query).unwrap();
+    let plan = query.exec_plan.get_ir_plan();
+    let top = plan.get_top().unwrap();
+    let buckets = query.bucket_discovery(top).unwrap();
+
+    let bucket = query.engine.determine_bucket_id(&["1", "222"].join(""));
+    let expected = Buckets::new_filtered([bucket].into());
+
+    assert_eq!(expected, buckets);
+}
+
+#[test]
+fn union_complex_cond_query() {
+    let query = r#"SELECT *
+    FROM
+        (SELECT "identification_number", "product_code"
+        FROM "hash_testing"
+        WHERE "sys_op" = 1
+        UNION ALL
+        SELECT "identification_number", "product_code"
+        FROM "hash_testing_hist"
+        WHERE "sys_op" > 1) AS "t3"
+    WHERE ("identification_number" = 1
+        OR ("identification_number" = 100
+        OR "identification_number" = 1000))
+        AND ("product_code" = '222'
+        OR "product_code" = '111')"#;
+
+    let engine = EngineMock::new();
+    let mut query = Query::new(engine, query).unwrap();
+    let plan = query.exec_plan.get_ir_plan();
+    let top = plan.get_top().unwrap();
+    let buckets = query.bucket_discovery(top).unwrap();
+
+    let bucket1222 = query.engine.determine_bucket_id(&["1", "222"].join(""));
+    let bucket100222 = query.engine.determine_bucket_id(&["100", "222"].join(""));
+    let bucket1000222 = query.engine.determine_bucket_id(&["1000", "222"].join(""));
+    let bucket1111 = query.engine.determine_bucket_id(&["1", "111"].join(""));
+    let bucket100111 = query.engine.determine_bucket_id(&["100", "111"].join(""));
+    let bucket1000111 = query.engine.determine_bucket_id(&["1000", "111"].join(""));
+    let expected = Buckets::new_filtered(
+        [
+            bucket1222,
+            bucket100222,
+            bucket1000222,
+            bucket1111,
+            bucket100111,
+            bucket1000111,
+        ]
+        .into(),
+    );
+
+    assert_eq!(expected, buckets);
+}
diff --git a/src/executor/engine.rs b/src/executor/engine.rs
index 670be1c336..9b554bdee8 100644
--- a/src/executor/engine.rs
+++ b/src/executor/engine.rs
@@ -1,4 +1,5 @@
 use crate::errors::QueryPlannerError;
+use crate::executor::bucket::Buckets;
 use crate::executor::ir::ExecutionPlan;
 use crate::executor::result::BoxExecuteFormat;
 use crate::executor::vtable::VirtualTable;
@@ -28,7 +29,7 @@ pub trait Engine {
     type Metadata;
 
     /// Return object of metadata storage
-    fn metadata(&self) -> Self::Metadata
+    fn metadata(&self) -> &Self::Metadata
     where
         Self: Sized;
 
@@ -49,6 +50,7 @@ pub trait Engine {
         &self,
         plan: &mut ExecutionPlan,
         motion_node_id: usize,
+        buckets: &Buckets,
     ) -> Result<VirtualTable, QueryPlannerError>;
 
     /// Execute sql query on the all shards in cluster
@@ -59,10 +61,11 @@ pub trait Engine {
         &self,
         plan: &mut ExecutionPlan,
         top_id: usize,
+        buckets: &Buckets,
     ) -> Result<BoxExecuteFormat, QueryPlannerError>;
 
     /// Determine shard for query execution by sharding key value
-    fn determine_bucket_id(&self, s: &str) -> usize;
+    fn determine_bucket_id(&self, s: &str) -> u64;
 }
 
 #[cfg(test)]
diff --git a/src/executor/engine/cartridge.rs b/src/executor/engine/cartridge.rs
index 2d2b5fe233..b46b4bf182 100644
--- a/src/executor/engine/cartridge.rs
+++ b/src/executor/engine/cartridge.rs
@@ -4,16 +4,17 @@ use tarantool::log::{say, SayLevel};
 use tarantool::tlua::LuaFunction;
 
 use crate::errors::QueryPlannerError;
-use crate::executor::engine::cartridge::bucket::str_to_bucket_id;
+use crate::executor::bucket::Buckets;
 use crate::executor::engine::cartridge::cache::ClusterSchema;
+use crate::executor::engine::cartridge::hash::str_to_bucket_id;
 use crate::executor::engine::Engine;
 use crate::executor::ir::ExecutionPlan;
 use crate::executor::result::BoxExecuteFormat;
 use crate::executor::vtable::VirtualTable;
 
 mod backend;
-pub mod bucket;
 pub mod cache;
+pub mod hash;
 
 #[derive(Debug, Clone)]
 pub struct Runtime {
@@ -25,8 +26,8 @@ pub struct Runtime {
 impl Engine for Runtime {
     type Metadata = ClusterSchema;
 
-    fn metadata(&self) -> Self::Metadata {
-        self.metadata.clone()
+    fn metadata(&self) -> &Self::Metadata {
+        &self.metadata
     }
 
     fn has_metadata(&self) -> bool {
@@ -64,42 +65,36 @@ impl Engine for Runtime {
         &self,
         plan: &mut ExecutionPlan,
         top_id: usize,
+        buckets: &Buckets,
     ) -> Result<BoxExecuteFormat, QueryPlannerError> {
         let mut result = BoxExecuteFormat::new();
         let sql = plan.subtree_as_sql(top_id)?;
 
-        say(
-            SayLevel::Debug,
-            file!(),
-            line!().try_into().unwrap_or(0),
-            Option::from("exec"),
-            &format!("exec query: {:?}", sql),
-        );
-
-        if let Some(shard_keys) = plan.discovery(top_id)? {
-            // sending query to nodes
-            for shard in shard_keys {
-                // exec query on node
-                let temp_result = self.exec_query(&shard, &sql)?;
-                result.extend(temp_result)?;
+        match buckets {
+            Buckets::All => {
+                result.extend(cluster_exec_query(&sql)?)?;
+            }
+            Buckets::Filtered(list) => {
+                for bucket in list {
+                    let temp_result = bucket_exec_query(*bucket, &sql)?;
+                    result.extend(temp_result)?;
+                }
             }
-        } else {
-            let temp_result = mp_exec_query(&sql)?;
-
-            result.extend(temp_result)?;
         }
+
         Ok(result)
     }
 
-    /// Transform sub query result to virtual table
+    /// Transform sub query results into a virtual table.
     fn materialize_motion(
         &self,
         plan: &mut ExecutionPlan,
         motion_node_id: usize,
+        buckets: &Buckets,
     ) -> Result<VirtualTable, QueryPlannerError> {
         let top = &plan.get_motion_subtree_root(motion_node_id)?;
 
-        let result = self.exec(plan, *top)?;
+        let result = self.exec(plan, *top, buckets)?;
         let mut vtable = result.as_virtual_table()?;
 
         if let Some(name) = &plan.get_motion_alias(motion_node_id)? {
@@ -109,8 +104,8 @@ impl Engine for Runtime {
         Ok(vtable)
     }
 
-    /// Calculation ``bucket_id`` function
-    fn determine_bucket_id(&self, s: &str) -> usize {
+    /// Calculate bucket for a key.
+    fn determine_bucket_id(&self, s: &str) -> u64 {
         str_to_bucket_id(s, self.bucket_count)
     }
 }
@@ -174,93 +169,79 @@ impl Runtime {
 
         Ok(())
     }
+}
 
-    /// Function execute sql query on selected node
-    fn exec_query(
-        &self,
-        shard_key: &str,
-        query: &str,
-    ) -> Result<BoxExecuteFormat, QueryPlannerError> {
-        let cast_bucket_id: u64 = match self.determine_bucket_id(shard_key).try_into() {
-            Ok(v) => v,
-            Err(_) => {
-                return Err(QueryPlannerError::CustomError("Invalid bucket id".into()));
-            }
-        };
-
-        say(
-            SayLevel::Debug,
-            file!(),
-            line!().try_into().unwrap_or(0),
-            None,
-            &format!(
-                "distribution keys is {:?} bucket {:?}",
-                shard_key, cast_bucket_id
-            ),
-        );
-
-        let lua = tarantool::lua_state();
-        match lua.exec(
-            r#"
-        local vshard = require('vshard')
-        local yaml = require('yaml')
-
-        function execute_sql(bucket_id, query)
-            local res, err = vshard.router.call(
-                bucket_id,
-                'read',
-                'box.execute',
-                { query }
-            )
-
-            if err ~= nil then
-                error(err)
-            end
+/// Send the query to a single bucket and merge results (map-reduce).
+fn bucket_exec_query(bucket: u64, query: &str) -> Result<BoxExecuteFormat, QueryPlannerError> {
+    say(
+        SayLevel::Debug,
+        file!(),
+        line!().try_into().unwrap_or(0),
+        None,
+        &format!("Execute a query {:?} on bucket {:?}", query, bucket),
+    );
 
-            return res
+    let lua = tarantool::lua_state();
+    match lua.exec(
+        r#"
+    local vshard = require('vshard')
+    local yaml = require('yaml')
+
+    function execute_sql(bucket_id, query)
+        local res, err = vshard.router.call(
+            bucket_id,
+            'read',
+            'box.execute',
+            { query }
+        )
+
+        if err ~= nil then
+            error(err)
         end
-    "#,
-        ) {
-            Ok(_) => {}
-            Err(e) => {
-                say(
-                    SayLevel::Error,
-                    file!(),
-                    line!().try_into().unwrap_or(0),
-                    Option::from("exec_query"),
-                    &format!("{:?}", e),
-                );
-                return Err(QueryPlannerError::LuaError(format!(
-                    "Failed lua code loading: {:?}",
-                    e
-                )));
-            }
+
+        return res
+    end
+"#,
+    ) {
+        Ok(_) => {}
+        Err(e) => {
+            say(
+                SayLevel::Error,
+                file!(),
+                line!().try_into().unwrap_or(0),
+                Option::from("exec_query"),
+                &format!("{:?}", e),
+            );
+            return Err(QueryPlannerError::LuaError(format!(
+                "Failed lua code loading: {:?}",
+                e
+            )));
         }
+    }
 
-        let exec_sql: LuaFunction<_> = lua.get("execute_sql").ok_or_else(|| {
-            QueryPlannerError::LuaError("Lua function `execute_sql` not found".into())
-        })?;
+    let exec_sql: LuaFunction<_> = lua.get("execute_sql").ok_or_else(|| {
+        QueryPlannerError::LuaError("Lua function `execute_sql` not found".into())
+    })?;
 
-        let res: BoxExecuteFormat = match exec_sql.call_with_args((cast_bucket_id, query)) {
-            Ok(v) => v,
-            Err(e) => {
-                say(
-                    SayLevel::Error,
-                    file!(),
-                    line!().try_into().unwrap_or(0),
-                    Option::from("exec_query"),
-                    &format!("{:?}", e),
-                );
-                return Err(QueryPlannerError::LuaError(format!("Lua error: {:?}", e)));
-            }
-        };
+    let res: BoxExecuteFormat = match exec_sql.call_with_args((bucket, query)) {
+        Ok(v) => v,
+        Err(e) => {
+            say(
+                SayLevel::Error,
+                file!(),
+                line!().try_into().unwrap_or(0),
+                Option::from("exec_query"),
+                &format!("{:?}", e),
+            );
+            return Err(QueryPlannerError::LuaError(format!("Lua error: {:?}", e)));
+        }
+    };
 
-        Ok(res)
-    }
+    Ok(res)
 }
 
-/// Sends query to all instances and merges results after (map-reduce).
-fn mp_exec_query(query: &str) -> Result<BoxExecuteFormat, QueryPlannerError> {
+/// Send the query to all instances and merge results (map-reduce).
+fn cluster_exec_query(query: &str) -> Result<BoxExecuteFormat, QueryPlannerError> {
     say(
         SayLevel::Debug,
         file!(),
@@ -316,7 +297,7 @@ fn mp_exec_query(query: &str) -> Result<BoxExecuteFormat, QueryPlannerError> {
                 SayLevel::Error,
                 file!(),
                 line!().try_into().unwrap_or(0),
-                Option::from("mp_exec_query"),
+                Option::from("cluster_exec_query"),
                 &format!("{:?}", e),
             );
             return Err(QueryPlannerError::LuaError(format!(
@@ -337,7 +318,7 @@ fn mp_exec_query(query: &str) -> Result<BoxExecuteFormat, QueryPlannerError> {
                 SayLevel::Error,
                 file!(),
                 line!().try_into().unwrap_or(0),
-                Option::from("mp_exec_query"),
+                Option::from("cluster_exec_query"),
                 &format!("{:?}", e),
             );
             return Err(QueryPlannerError::LuaError(format!("Lua error: {:?}", e)));
diff --git a/src/executor/engine/cartridge/backend/sql/ir.rs b/src/executor/engine/cartridge/backend/sql/ir.rs
index f2cec5735b..ed0e641791 100644
--- a/src/executor/engine/cartridge/backend/sql/ir.rs
+++ b/src/executor/engine/cartridge/backend/sql/ir.rs
@@ -3,12 +3,12 @@ use itertools::Itertools;
 use crate::errors::QueryPlannerError;
 use crate::executor::ir::ExecutionPlan;
 use crate::ir::expression::Expression;
-use crate::ir::Node;
 use crate::ir::operator::Relational;
+use crate::ir::Node;
 
 use super::tree::{SyntaxData, SyntaxPlan};
 
-impl<'e> ExecutionPlan<'e> {
+impl ExecutionPlan {
     /// Traverse plan sub-tree (pointed by top) in the order
     /// convenient for SQL serialization.
     ///
diff --git a/src/executor/engine/cartridge/backend/sql/ir/tests.rs b/src/executor/engine/cartridge/backend/sql/ir/tests.rs
index 72c0a8635d..0623ede977 100644
--- a/src/executor/engine/cartridge/backend/sql/ir/tests.rs
+++ b/src/executor/engine/cartridge/backend/sql/ir/tests.rs
@@ -13,9 +13,9 @@ fn one_table_projection() {
     let metadata = &MetadataMock::new();
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let plan = ast.to_ir(metadata).unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
 
     assert_eq!(
@@ -38,9 +38,9 @@ fn one_table_with_asterisk() {
     let metadata = &MetadataMock::new();
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let plan = ast.to_ir(metadata).unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
 
     assert_eq!(
@@ -69,9 +69,9 @@ fn union_all() {
     let metadata = &MetadataMock::new();
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let plan = ast.to_ir(metadata).unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
 
     assert_eq!(
@@ -96,9 +96,9 @@ fn from_sub_query() {
     let metadata = &MetadataMock::new();
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let plan = ast.to_ir(metadata).unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
 
     assert_eq!(
@@ -128,9 +128,9 @@ fn from_sub_query_with_union() {
     let metadata = &MetadataMock::new();
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let plan = ast.to_ir(metadata).unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
 
     assert_eq!(
@@ -155,9 +155,9 @@ fn inner_join() {
     let metadata = &MetadataMock::new();
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let plan = ast.to_ir(metadata).unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
 
     assert_eq!(
@@ -182,9 +182,9 @@ fn inner_join_with_sq() {
     let metadata = &MetadataMock::new();
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let plan = ast.to_ir(metadata).unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
 
     assert_eq!(
@@ -209,8 +209,8 @@ fn selection_with_sq() {
     let metadata = &MetadataMock::new();
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let plan = ast.to_ir(metadata).unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
-    let top_id = plan.get_top().unwrap();
+    let ex_plan = ExecutionPlan::from(plan);
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
 
     assert_eq!(
diff --git a/src/executor/engine/cartridge/backend/sql/tree.rs b/src/executor/engine/cartridge/backend/sql/tree.rs
index 34fdfab598..8a8d665929 100644
--- a/src/executor/engine/cartridge/backend/sql/tree.rs
+++ b/src/executor/engine/cartridge/backend/sql/tree.rs
@@ -380,17 +380,14 @@ impl Select {
 /// A wrapper over original plan tree.
 /// We can modify it as we wish without any influence
 /// on the original plan tree.
-pub struct SyntaxPlan<'p, 'e> {
+pub struct SyntaxPlan<'p> {
     pub(crate) nodes: SyntaxNodes,
     top: Option<usize>,
-    plan: &'p ExecutionPlan<'e>,
+    plan: &'p ExecutionPlan,
 }
 
 #[allow(dead_code)]
-impl<'p, 'e> SyntaxPlan<'p, 'e>
-where
-    'e: 'p,
-{
+impl<'p> SyntaxPlan<'p> {
     #[allow(clippy::too_many_lines)]
     pub fn add_plan_node(&mut self, id: usize) -> Result<usize, QueryPlannerError> {
         let ir_plan = self.plan.get_ir_plan();
@@ -528,19 +525,17 @@ where
                     if let Some(motion_id) = ir_plan.get_motion_from_row(id)? {
                         // Replace motion node to virtual table node
                         let vtable = self.plan.get_motion_vtable(motion_id)?;
-                        if vtable.get_alias().is_none() {
-                            let sn = SyntaxNode::new_pointer(
-                                id,
-                                None,
-                                &[
-                                    self.nodes.push_syntax_node(SyntaxNode::new_open()),
-                                    self.nodes.push_syntax_node(SyntaxNode::new_vtable(vtable)),
-                                    self.nodes.push_syntax_node(SyntaxNode::new_close()),
-                                ],
-                            );
+                        let sn = SyntaxNode::new_pointer(
+                            id,
+                            None,
+                            &[
+                                self.nodes.push_syntax_node(SyntaxNode::new_open()),
+                                self.nodes.push_syntax_node(SyntaxNode::new_vtable(vtable)),
+                                self.nodes.push_syntax_node(SyntaxNode::new_close()),
+                            ],
+                        );
 
-                            return Ok(self.nodes.push_syntax_node(sn));
-                        }
+                        return Ok(self.nodes.push_syntax_node(sn));
                     }
 
                     if let Some(sq_id) = ir_plan.get_sub_query_from_row_node(id)? {
@@ -695,7 +690,7 @@ where
         Ok(())
     }
 
-    fn empty(plan: &'p ExecutionPlan<'e>) -> Self {
+    fn empty(plan: &'p ExecutionPlan) -> Self {
         SyntaxPlan {
             nodes: SyntaxNodes::new(),
             top: None,
@@ -703,7 +698,7 @@ where
         }
     }
 
-    pub fn new(plan: &'p ExecutionPlan<'e>, top: usize) -> Result<Self, QueryPlannerError> {
+    pub fn new(plan: &'p ExecutionPlan, top: usize) -> Result<Self, QueryPlannerError> {
         let mut sp = SyntaxPlan::empty(plan);
         let ir_plan = plan.get_ir_plan();
 
diff --git a/src/executor/engine/cartridge/backend/sql/tree/tests.rs b/src/executor/engine/cartridge/backend/sql/tree/tests.rs
index 3dbd4bf5b1..a8e0b5ff24 100644
--- a/src/executor/engine/cartridge/backend/sql/tree/tests.rs
+++ b/src/executor/engine/cartridge/backend/sql/tree/tests.rs
@@ -39,10 +39,11 @@ fn sql_order_selection() {
     let s = fs::read_to_string(path).unwrap();
     let expected_plan = Plan::from_yaml(&s).unwrap();
     assert_eq!(expected_plan, plan);
-    let exec_plan = ExecutionPlan::from(&plan);
+    let exec_plan = ExecutionPlan::from(plan.clone());
+    let top_id = exec_plan.get_ir_plan().get_top().unwrap();
 
     // test the syntax plan
-    let sp = SyntaxPlan::new(&exec_plan, plan.get_top().unwrap()).unwrap();
+    let sp = SyntaxPlan::new(&exec_plan, top_id).unwrap();
     let path = Path::new("")
         .join("tests")
         .join("artifactory")
@@ -53,10 +54,12 @@ fn sql_order_selection() {
     let s = fs::read_to_string(path).unwrap();
     let expected_syntax_nodes = SyntaxNodes::from_yaml(&s).unwrap();
     assert_eq!(expected_syntax_nodes, sp.nodes);
-    let exec_plan = ExecutionPlan::from(&plan);
+
+    let exec_plan = ExecutionPlan::from(plan);
+    let top_id = exec_plan.get_ir_plan().get_top().unwrap();
 
     // get nodes in the sql-convenient order
-    let nodes = exec_plan.get_sql_order(plan.get_top().unwrap()).unwrap();
+    let nodes = exec_plan.get_sql_order(top_id).unwrap();
     let mut nodes_iter = nodes.into_iter();
     assert_eq!(Some(SyntaxData::PlanId(16)), nodes_iter.next()); // projection
     assert_eq!(Some(SyntaxData::PlanId(13)), nodes_iter.next()); // ref
diff --git a/src/executor/engine/cartridge/bucket.rs b/src/executor/engine/cartridge/bucket.rs
deleted file mode 100644
index fd59adf9b4..0000000000
--- a/src/executor/engine/cartridge/bucket.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-use std::convert::TryInto;
-use std::hash::Hasher;
-
-use fasthash::{murmur3::Hasher32, FastHasher};
-
-/// Determine bucket value using `murmur3` hash function
-pub(in crate::executor::engine::cartridge) fn str_to_bucket_id(
-    s: &str,
-    bucket_count: usize,
-) -> usize {
-    let mut hash = Hasher32::new();
-    hash.write(s.as_bytes());
-
-    let hash: usize = hash.finish().try_into().unwrap();
-    hash % bucket_count + 1
-}
-
-#[cfg(test)]
-mod tests;
diff --git a/src/executor/engine/cartridge/hash.rs b/src/executor/engine/cartridge/hash.rs
new file mode 100644
index 0000000000..d747f1d523
--- /dev/null
+++ b/src/executor/engine/cartridge/hash.rs
@@ -0,0 +1,12 @@
+use fasthash::{murmur3::Hasher32, FastHasher};
+use std::hash::Hasher;
+
+/// Determine bucket value using `murmur3` hash function
+pub(in crate::executor::engine) fn str_to_bucket_id(s: &str, bucket_count: usize) -> u64 {
+    let mut hash = Hasher32::new();
+    hash.write(s.as_bytes());
+    hash.finish() % bucket_count as u64 + 1
+}
+
+#[cfg(test)]
+mod tests;
diff --git a/src/executor/engine/cartridge/bucket/tests.rs b/src/executor/engine/cartridge/hash/tests.rs
similarity index 100%
rename from src/executor/engine/cartridge/bucket/tests.rs
rename to src/executor/engine/cartridge/hash/tests.rs
diff --git a/src/executor/engine/mock.rs b/src/executor/engine/mock.rs
index 4280872727..0456ef524e 100644
--- a/src/executor/engine/mock.rs
+++ b/src/executor/engine/mock.rs
@@ -1,19 +1,19 @@
-use std::cell::RefCell;
 use std::collections::HashMap;
 
 use crate::errors::QueryPlannerError;
+use crate::executor::bucket::Buckets;
+use crate::executor::engine::cartridge::hash::str_to_bucket_id;
 use crate::executor::engine::Engine;
 use crate::executor::ir::ExecutionPlan;
 use crate::executor::result::{BoxExecuteFormat, Value};
 use crate::executor::vtable::VirtualTable;
 use crate::executor::Metadata;
-use crate::ir::operator::Relational;
 use crate::ir::relation::{Column, Table, Type};
-use crate::ir::value::Value as IrValue;
 
 #[derive(Debug, Clone)]
 pub struct MetadataMock {
     tables: HashMap<String, Table>,
+    bucket_count: usize,
 }
 
 impl Metadata for MetadataMock {
@@ -101,24 +101,27 @@ impl MetadataMock {
             Table::new_seg("\"t\"", columns.clone(), sharding_key).unwrap(),
         );
 
-        MetadataMock { tables }
+        MetadataMock {
+            tables,
+            bucket_count: 10000,
+        }
     }
 }
 
 #[derive(Debug, Clone)]
 pub struct EngineMock {
     metadata: MetadataMock,
-    pub query_result: RefCell<BoxExecuteFormat>,
+    virtual_tables: HashMap<usize, VirtualTable>,
 }
 
 impl Engine for EngineMock {
     type Metadata = MetadataMock;
 
-    fn metadata(&self) -> Self::Metadata
+    fn metadata(&self) -> &Self::Metadata
     where
         Self: Sized,
     {
-        self.metadata.clone()
+        &self.metadata
     }
 
     fn has_metadata(&self) -> bool {
@@ -136,95 +139,85 @@ impl Engine for EngineMock {
 
     fn materialize_motion(
         &self,
-        plan: &mut ExecutionPlan,
+        _plan: &mut ExecutionPlan,
         motion_node_id: usize,
+        _buckets: &Buckets,
     ) -> Result<VirtualTable, QueryPlannerError> {
-        let sq_id = &plan.get_motion_child(motion_node_id)?;
-
-        let mut vtable = VirtualTable::new();
-
-        if let Relational::ScanSubQuery { alias, .. } =
-            &plan.get_ir_plan().get_relation_node(*sq_id)?
-        {
-            if let Some(name) = alias {
-                vtable.set_alias(name)?;
-            }
+        if let Some(virtual_table) = self.virtual_tables.get(&motion_node_id) {
+            Ok(virtual_table.clone())
+        } else {
+            Err(QueryPlannerError::CustomError(
+                "No virtual table found for motion node".to_string(),
+            ))
         }
-
-        vtable.add_column(Column {
-            name: "identification_number".into(),
-            r#type: Type::Integer,
-        });
-
-        vtable.add_values_tuple(vec![IrValue::number_from_str("2")?]);
-        vtable.add_values_tuple(vec![IrValue::number_from_str("3")?]);
-
-        Ok(vtable)
     }
 
     fn exec(
         &self,
         plan: &mut ExecutionPlan,
         top_id: usize,
+        buckets: &Buckets,
     ) -> Result<BoxExecuteFormat, QueryPlannerError> {
+        let mut result = BoxExecuteFormat::new();
         let sql = plan.subtree_as_sql(top_id)?;
 
-        if let Some(shard_keys) = plan.discovery(top_id)? {
-            for shard in shard_keys {
-                self.exec_query(&shard, &sql)?;
+        match buckets {
+            Buckets::All => {
+                result.extend(cluster_exec_query(&sql)?)?;
+            }
+            Buckets::Filtered(list) => {
+                for bucket in list {
+                    let temp_result = bucket_exec_query(*bucket, &sql)?;
+                    result.extend(temp_result)?;
+                }
             }
-        } else {
-            self.mp_exec_query(&sql)?;
         }
 
-        let mut result = self.query_result.borrow_mut();
-
-        // sorting for test reproduce
+        // Sort results to make tests reproducible.
         result.rows.sort_by_key(|k| k[0].to_string());
-
-        Ok(result.clone())
+        Ok(result)
     }
 
-    fn determine_bucket_id(&self, _s: &str) -> usize {
-        1
+    fn determine_bucket_id(&self, s: &str) -> u64 {
+        str_to_bucket_id(s, self.metadata.bucket_count)
     }
 }
 
 impl EngineMock {
     pub fn new() -> Self {
-        let result_cell = RefCell::new(BoxExecuteFormat {
-            metadata: vec![],
-            rows: vec![],
-        });
-
         EngineMock {
             metadata: MetadataMock::new(),
-            query_result: result_cell,
+            virtual_tables: HashMap::new(),
         }
     }
 
-    fn exec_query(
-        &self,
-        shard_key: &str,
-        query: &str,
-    ) -> Result<BoxExecuteFormat, QueryPlannerError> {
-        let mut result = self.query_result.borrow_mut();
+    pub fn add_virtual_table(
+        &mut self,
+        id: usize,
+        table: VirtualTable,
+    ) -> Result<(), QueryPlannerError> {
+        self.virtual_tables.insert(id, table);
+        Ok(())
+    }
+}
 
-        result.rows.push(vec![
-            Value::String(format!("query send to [{}] shard", shard_key)),
-            Value::String(String::from(query)),
-        ]);
+fn bucket_exec_query(bucket: u64, query: &str) -> Result<BoxExecuteFormat, QueryPlannerError> {
+    let mut result = BoxExecuteFormat::new();
 
-        Ok(result.clone())
-    }
+    result.rows.push(vec![
+        Value::String(format!("Execute query on a bucket [{}]", bucket)),
+        Value::String(String::from(query)),
+    ]);
 
-    fn mp_exec_query(&self, query: &str) -> Result<BoxExecuteFormat, QueryPlannerError> {
-        let mut result = self.query_result.borrow_mut();
+    Ok(result.clone())
+}
 
-        result.rows.push(vec![
-            Value::String(String::from("query send to all shards")),
-            Value::String(String::from(query)),
-        ]);
-        Ok(result.clone())
-    }
+fn cluster_exec_query(query: &str) -> Result<BoxExecuteFormat, QueryPlannerError> {
+    let mut result = BoxExecuteFormat::new();
+
+    result.rows.push(vec![
+        Value::String(String::from("Execute query on all buckets")),
+        Value::String(String::from(query)),
+    ]);
+    Ok(result.clone())
 }
diff --git a/src/executor/ir.rs b/src/executor/ir.rs
index e212dbdfe1..2a5c6535c2 100644
--- a/src/executor/ir.rs
+++ b/src/executor/ir.rs
@@ -8,13 +8,13 @@ use crate::ir::transformation::redistribution::MotionPolicy;
 use crate::ir::Plan;
 
 #[derive(Debug, Clone)]
-pub struct ExecutionPlan<'e> {
-    plan: &'e Plan,
+pub struct ExecutionPlan {
+    plan: Plan,
     vtables: Option<HashMap<usize, VirtualTable>>,
 }
 
-impl<'e> From<&'e Plan> for ExecutionPlan<'e> {
-    fn from(plan: &'e Plan) -> Self {
+impl From<Plan> for ExecutionPlan {
+    fn from(plan: Plan) -> Self {
         ExecutionPlan {
             plan,
             vtables: None,
@@ -22,9 +22,14 @@ impl<'e> From<&'e Plan> for ExecutionPlan<'e> {
     }
 }
 
-impl<'e> ExecutionPlan<'e> {
+impl ExecutionPlan {
     pub fn get_ir_plan(&self) -> &Plan {
-        self.plan
+        &self.plan
+    }
+
+    #[allow(dead_code)]
+    pub fn get_mut_ir_plan(&mut self) -> &mut Plan {
+        &mut self.plan
     }
 
     /// Add materialize motion result to translation map of virtual tables
@@ -55,7 +60,7 @@ impl<'e> ExecutionPlan<'e> {
 
     /// Get motion virtual table
     pub fn get_motion_vtable(&self, motion_id: usize) -> Result<VirtualTable, QueryPlannerError> {
-        if let Some(vtable) = self.vtables.clone() {
+        if let Some(vtable) = &self.vtables {
             if let Some(result) = vtable.get(&motion_id) {
                 return Ok(result.clone());
             }
diff --git a/src/executor/shard/tests.rs b/src/executor/shard/tests.rs
deleted file mode 100644
index c4b8e447dc..0000000000
--- a/src/executor/shard/tests.rs
+++ /dev/null
@@ -1,106 +0,0 @@
-use std::collections::HashSet;
-
-use pretty_assertions::assert_eq;
-
-use crate::executor::engine::mock::MetadataMock;
-use crate::executor::ir::ExecutionPlan;
-use crate::frontend::sql::ast::AbstractSyntaxTree;
-
-#[test]
-fn simple_union_query() {
-    let query = r#"SELECT * FROM (
-    SELECT * FROM "test_space" WHERE "sysFrom" > 0
-    UNION ALL
-    SELECT * FROM "test_space_hist" WHERE "sysFrom" < 0
-    ) as "t3"
-    WHERE "id" = 1"#;
-
-    let metadata = &MetadataMock::new();
-
-    let ast = AbstractSyntaxTree::new(query).unwrap();
-    let mut plan = ast.to_ir(metadata).unwrap();
-    plan.add_motions().unwrap();
-    let mut ex_plan = ExecutionPlan::from(&plan);
-
-    let top = plan.get_top().unwrap();
-    // let expected = HashSet::from([3940]);
-    let expected = HashSet::from(["1".into()]);
-    assert_eq!(Some(expected), ex_plan.discovery(top).unwrap())
-}
-
-#[test]
-fn simple_disjunction_in_union_query() {
-    let query = r#"SELECT * FROM (
-    SELECT * FROM "test_space" WHERE "sysFrom" > 0
-    UNION ALL
-    SELECT * FROM "test_space_hist" WHERE "sysFrom" < 0
-    ) as "t3"
-    WHERE ("id" = 1) OR ("id" = 100)"#;
-
-    let metadata = &MetadataMock::new();
-
-    let ast = AbstractSyntaxTree::new(query).unwrap();
-    let mut plan = ast.to_ir(metadata).unwrap();
-    plan.add_motions().unwrap();
-    let mut ex_plan = ExecutionPlan::from(&plan);
-
-    let top = plan.get_top().unwrap();
-    // let expected = HashSet::from([3940, 18512]);
-
-    let expected = HashSet::from(["1".into(), "100".into()]);
-    assert_eq!(Some(expected), ex_plan.discovery(top).unwrap())
-}
-
-#[test]
-fn complex_shard_key_union_query() {
-    let query = r#"SELECT *
-FROM
-    (SELECT "identification_number", "product_code"
-    FROM "hash_testing"
-    WHERE "sys_op" = 1
-    UNION ALL
-    SELECT "identification_number", "product_code"
-    FROM "hash_testing_hist"
-    WHERE "sys_op" > 1) AS "t3"
-WHERE "identification_number" = 1 AND "product_code" = '222'"#;
-
-    let metadata = &MetadataMock::new();
-
-    let ast = AbstractSyntaxTree::new(query).unwrap();
-    let mut plan = ast.to_ir(metadata).unwrap();
-    plan.add_motions().unwrap();
-    let mut ex_plan = ExecutionPlan::from(&plan);
-
-    let top = plan.get_top().unwrap();
-
-    assert_eq!(None, ex_plan.discovery(top).unwrap())
-}
-
-#[test]
-fn union_complex_cond_query() {
-    let query = r#"SELECT *
-FROM
-    (SELECT "identification_number", "product_code"
-    FROM "hash_testing"
-    WHERE "sys_op" = 1
-    UNION ALL
-    SELECT "identification_number", "product_code"
-    FROM "hash_testing_hist"
-    WHERE "sys_op" > 1) AS "t3"
-WHERE ("identification_number" = 1
-    OR ("identification_number" = 100
-    OR "identification_number" = 1000))
-    AND ("product_code" = '222'
-    OR "product_code" = '111')"#;
-
-    let metadata = &MetadataMock::new();
-
-    let ast = AbstractSyntaxTree::new(query).unwrap();
-    let mut plan = ast.to_ir(metadata).unwrap();
-    plan.add_motions().unwrap();
-    let mut ex_plan = ExecutionPlan::from(&plan);
-
-    let top = plan.get_top().unwrap();
-
-    assert_eq!(None, ex_plan.discovery(top).unwrap())
-}
diff --git a/src/executor/tests.rs b/src/executor/tests.rs
index 772327c91c..12fbe128cc 100644
--- a/src/executor/tests.rs
+++ b/src/executor/tests.rs
@@ -1,9 +1,10 @@
-use pretty_assertions::assert_eq;
-
+use super::*;
 use crate::executor::engine::mock::EngineMock;
 use crate::executor::result::Value;
-
-use super::*;
+use crate::executor::vtable::VirtualTable;
+use crate::ir::relation::{Column, Type};
+use crate::ir::value::Value as IrValue;
+use pretty_assertions::assert_eq;
 
 #[test]
 fn shard_query() {
@@ -11,13 +12,13 @@ fn shard_query() {
     let engine = EngineMock::new();
 
     let mut query = Query::new(engine, sql).unwrap();
-    query.optimize().unwrap();
 
     let mut expected = BoxExecuteFormat::new();
+    let bucket = query.engine.determine_bucket_id("1");
     expected
         .rows
         .push(vec![
-            Value::String(String::from("query send to [1] shard")),
+            Value::String(format!("Execute query on a bucket [{}]", bucket)),
             Value::String(String::from(r#"SELECT "test_space"."FIRST_NAME" as "FIRST_NAME" FROM "test_space" WHERE ("test_space"."id") = (1)"#))
         ]);
     assert_eq!(expected, query.exec().unwrap())
@@ -39,13 +40,13 @@ fn shard_union_query() {
     let engine = EngineMock::new();
 
     let mut query = Query::new(engine, sql).unwrap();
-    query.optimize().unwrap();
 
     let mut expected = BoxExecuteFormat::new();
+    let bucket = query.engine.determine_bucket_id("1");
     expected
         .rows
         .push(vec![
-            Value::String(String::from("query send to [1] shard")),
+            Value::String(format!("Execute query on a bucket [{}]", bucket)),
             Value::String(
                 format!(
                     "{} {}{} {} {}{} {}",
@@ -69,11 +70,11 @@ fn map_reduce_query() {
     let engine = EngineMock::new();
 
     let mut query = Query::new(engine, sql).unwrap();
-    query.optimize().unwrap();
 
     let mut expected = BoxExecuteFormat::new();
+    let bucket = query.engine.determine_bucket_id(&["1", "457"].join(""));
     expected.rows.push(vec![
-        Value::String(String::from("query send to all shards")),
+        Value::String(format!("Execute query on a bucket [{}]", bucket)),
         Value::String(
             format!(
                 "{} {} {}",
@@ -94,14 +95,22 @@ fn linker_test() {
     let engine = EngineMock::new();
 
     let mut query = Query::new(engine, sql).unwrap();
-    query.optimize().unwrap();
+    let motion_id = query.exec_plan.get_ir_plan().get_slices().unwrap()[0][0];
+    let mut virtual_table = virtual_table_23();
+    virtual_table.set_alias("test").unwrap();
+    query
+        .engine
+        .add_virtual_table(motion_id, virtual_table)
+        .unwrap();
 
     let result = query.exec().unwrap();
 
     let mut expected = BoxExecuteFormat::new();
+    let bucket2 = query.engine.determine_bucket_id("2");
+    let bucket3 = query.engine.determine_bucket_id("3");
     expected.rows.extend(vec![
         vec![
-            Value::String(String::from("query send to [2] shard")),
+            Value::String(format!("Execute query on a bucket [{}]", bucket3)),
             Value::String(format!(
                 "{} {} {}",
                 r#"SELECT "test_space"."FIRST_NAME" as "FIRST_NAME""#,
@@ -110,7 +119,7 @@ fn linker_test() {
             )),
         ],
         vec![
-            Value::String(String::from("query send to [3] shard")),
+            Value::String(format!("Execute query on a bucket [{}]", bucket2)),
             Value::String(format!(
                 "{} {} {}",
                 r#"SELECT "test_space"."FIRST_NAME" as "FIRST_NAME""#,
@@ -140,14 +149,22 @@ fn union_linker_test() {
     let engine = EngineMock::new();
 
     let mut query = Query::new(engine, sql).unwrap();
-    query.optimize().unwrap();
+    let motion_id = query.exec_plan.get_ir_plan().get_slices().unwrap()[0][0];
+    let mut virtual_table = virtual_table_23();
+    virtual_table.set_alias("\"t2\"").unwrap();
+    query
+        .engine
+        .add_virtual_table(motion_id, virtual_table)
+        .unwrap();
 
     let result = query.exec().unwrap();
 
     let mut expected = BoxExecuteFormat::new();
+    let bucket2 = query.engine.determine_bucket_id("2");
+    let bucket3 = query.engine.determine_bucket_id("3");
     expected.rows.extend(vec![
         vec![
-            Value::String(String::from("query send to [2] shard")),
+            Value::String(format!("Execute query on a bucket [{}]", bucket3)),
             Value::String(
                 format!(
                     "{} {}{} {} {} {} {} {} {}{} {}",
@@ -166,7 +183,7 @@ fn union_linker_test() {
             )
         ],
         vec![
-            Value::String(String::from("query send to [3] shard")),
+            Value::String(format!("Execute query on a bucket [{}]", bucket2)),
             Value::String(
                 format!(
                     "{} {}{} {} {} {} {} {} {}{} {}",
@@ -210,45 +227,27 @@ INNER JOIN
     FROM "hash_single_testing_hist"
     WHERE "sys_op" <= 0) AS "t8"
     ON "t3"."id" = "t8"."identification_number"
-WHERE "t3"."id" = 1 AND "t8"."identification_number" = 1"#;
+WHERE "t3"."id" = 2 AND "t8"."identification_number" = 2"#;
 
     let engine = EngineMock::new();
 
     let mut query = Query::new(engine, sql).unwrap();
-    query.optimize().unwrap();
+    let motion_id = query.exec_plan.get_ir_plan().get_slices().unwrap()[0][0];
+    let mut virtual_table = virtual_table_23();
+    virtual_table.set_alias("\"t8\"").unwrap();
+    query
+        .engine
+        .add_virtual_table(motion_id, virtual_table)
+        .unwrap();
 
     let result = query.exec().unwrap();
 
     let mut expected = BoxExecuteFormat::new();
+    let bucket2 = query.engine.determine_bucket_id("2");
 
     expected.rows.extend(vec![
         vec![
-            Value::String(String::from("query send to [1] shard")),
-            Value::String(
-                format!(
-                    "{}, {}, {} {}{} {} {} {} {} {} {}{} {} {}{} {} {}",
-                    r#"SELECT "t3"."id" as "id""#,
-                    r#""t3"."FIRST_NAME" as "FIRST_NAME""#,
-                    r#""t8"."identification_number" as "identification_number""#,
-                    r#"FROM ("#,
-                    r#"SELECT "test_space"."id" as "id", "test_space"."FIRST_NAME" as "FIRST_NAME""#,
-                    r#"FROM "test_space""#,
-                    r#"WHERE ("test_space"."sys_op") < (0) and ("test_space"."sysFrom") >= (0)"#,
-                    r#"UNION ALL"#,
-                    r#"SELECT "test_space_hist"."id" as "id", "test_space_hist"."FIRST_NAME" as "FIRST_NAME""#,
-                    r#"FROM "test_space_hist""#,
-                    r#"WHERE ("test_space_hist"."sysFrom") <= (0)"#,
-                    r#") as "t3""#,
-                    r#"INNER JOIN"#,
-                    r#"(SELECT COLUMN_2 as "identification_number" FROM (VALUES (2),(3))"#,
-                    r#") as "t8""#,
-                    r#"ON ("t3"."id") = ("t8"."identification_number")"#,
-                    r#"WHERE ("t3"."id") = (1) and ("t8"."identification_number") = (1)"#
-                )
-            )
-        ],
-        vec![
-            Value::String(String::from("query send to [2] shard")),
+            Value::String(format!("Execute query on a bucket [{}]", bucket2)),
             Value::String(
                 format!(
                     "{}, {}, {} {}{} {} {} {} {} {} {}{} {} {}{} {} {}",
@@ -258,7 +257,7 @@ WHERE "t3"."id" = 1 AND "t8"."identification_number" = 1"#;
                     r#"FROM ("#,
                     r#"SELECT "test_space"."id" as "id", "test_space"."FIRST_NAME" as "FIRST_NAME""#,
                     r#"FROM "test_space""#,
-                    r#"WHERE ("test_space"."sys_op") < (0) and ("test_space"."sysFrom") >= (0)"#,
+                    r#"WHERE (0) > ("test_space"."sys_op") and ("test_space"."sysFrom") >= (0)"#,
                     r#"UNION ALL"#,
                     r#"SELECT "test_space_hist"."id" as "id", "test_space_hist"."FIRST_NAME" as "FIRST_NAME""#,
                     r#"FROM "test_space_hist""#,
@@ -267,33 +266,8 @@ WHERE "t3"."id" = 1 AND "t8"."identification_number" = 1"#;
                     r#"INNER JOIN"#,
                     r#"(SELECT COLUMN_2 as "identification_number" FROM (VALUES (2),(3))"#,
                     r#") as "t8""#,
-                    r#"ON ("t3"."id") = ("t8"."identification_number")"#,
-                    r#"WHERE ("t3"."id") = (1) and ("t8"."identification_number") = (1)"#
-                )
-            )
-        ],
-        vec![
-            Value::String(String::from("query send to [3] shard")),
-            Value::String(
-                format!(
-                    "{}, {}, {} {}{} {} {} {} {} {} {}{} {} {}{} {} {}",
-                    r#"SELECT "t3"."id" as "id""#,
-                    r#""t3"."FIRST_NAME" as "FIRST_NAME""#,
-                    r#""t8"."identification_number" as "identification_number""#,
-                    r#"FROM ("#,
-                    r#"SELECT "test_space"."id" as "id", "test_space"."FIRST_NAME" as "FIRST_NAME""#,
-                    r#"FROM "test_space""#,
-                    r#"WHERE ("test_space"."sys_op") < (0) and ("test_space"."sysFrom") >= (0)"#,
-                    r#"UNION ALL"#,
-                    r#"SELECT "test_space_hist"."id" as "id", "test_space_hist"."FIRST_NAME" as "FIRST_NAME""#,
-                    r#"FROM "test_space_hist""#,
-                    r#"WHERE ("test_space_hist"."sysFrom") <= (0)"#,
-                    r#") as "t3""#,
-                    r#"INNER JOIN"#,
-                    r#"(SELECT COLUMN_2 as "identification_number" FROM (VALUES (2),(3))"#,
-                    r#") as "t8""#,
-                    r#"ON ("t3"."id") = ("t8"."identification_number")"#,
-                    r#"WHERE ("t3"."id") = (1) and ("t8"."identification_number") = (1)"#
+                    r#"ON ("t3"."id") = (SELECT COLUMN_4 as "identification_number" FROM (VALUES (2),(3)))"#,
+                    r#"WHERE ("t3"."id", "t8"."identification_number") = (2, 2)"#
                 )
             )
         ],
@@ -311,14 +285,25 @@ fn anonymous_col_index_test() {
     let engine = EngineMock::new();
 
     let mut query = Query::new(engine, sql).unwrap();
-    query.optimize().unwrap();
+    let motion1_id = query.exec_plan.get_ir_plan().get_slices().unwrap()[0][0];
+    query
+        .engine
+        .add_virtual_table(motion1_id, virtual_table_23())
+        .unwrap();
+    let motion2_id = query.exec_plan.get_ir_plan().get_slices().unwrap()[0][1];
+    query
+        .engine
+        .add_virtual_table(motion2_id, virtual_table_23())
+        .unwrap();
 
     let result = query.exec().unwrap();
 
     let mut expected = BoxExecuteFormat::new();
+    let bucket2 = query.engine.determine_bucket_id("2");
+    let bucket3 = query.engine.determine_bucket_id("3");
     expected.rows.extend(vec![
         vec![
-            Value::String(String::from("query send to [2] shard")),
+            Value::String(format!("Execute query on a bucket [{}]", bucket3)),
             Value::String(format!(
                 "{} {}, {}, {}, {}, {} {} {} {} {} {}",
                 "SELECT",
@@ -335,7 +320,7 @@ fn anonymous_col_index_test() {
             )),
         ],
         vec![
-            Value::String(String::from("query send to [3] shard")),
+            Value::String(format!("Execute query on a bucket [{}]", bucket2)),
             Value::String(format!(
                 "{} {}, {}, {}, {}, {} {} {} {} {} {}",
                 "SELECT",
@@ -355,3 +340,18 @@ fn anonymous_col_index_test() {
 
     assert_eq!(expected, result)
 }
+
+/// Helper function to create a "test" virtual table.
+fn virtual_table_23() -> VirtualTable {
+    let mut virtual_table = VirtualTable::new();
+
+    virtual_table.add_column(Column {
+        name: "identification_number".into(),
+        r#type: Type::Integer,
+    });
+
+    virtual_table.add_values_tuple(vec![IrValue::number_from_str("2").unwrap()]);
+    virtual_table.add_values_tuple(vec![IrValue::number_from_str("3").unwrap()]);
+
+    virtual_table
+}
diff --git a/src/executor/vtable.rs b/src/executor/vtable.rs
index 45f04eb276..260dd0addf 100644
--- a/src/executor/vtable.rs
+++ b/src/executor/vtable.rs
@@ -77,8 +77,8 @@ impl VirtualTable {
 
     /// Get tuples was distributed by sharding keys
     #[must_use]
-    pub fn get_tuple_distribution(&self) -> HashMap<String, HashSet<usize>> {
-        self.hashing.clone()
+    pub fn get_tuple_distribution(&self) -> &HashMap<String, HashSet<usize>> {
+        &self.hashing
     }
 
     /// Distribute tuples by sharding key columns
diff --git a/src/frontend/sql/ir/tests.rs b/src/frontend/sql/ir/tests.rs
index 3889c984b3..b0388dcd1b 100644
--- a/src/frontend/sql/ir/tests.rs
+++ b/src/frontend/sql/ir/tests.rs
@@ -13,9 +13,9 @@ fn simple_query_to_ir() {
     let metadata = &MetadataMock::new();
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let plan = ast.to_ir(metadata).unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
 
     assert_eq!(
@@ -38,9 +38,9 @@ fn complex_cond_query_transform() {
     let metadata = &MetadataMock::new();
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let plan = ast.to_ir(metadata).unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
 
     assert_eq!(
@@ -70,9 +70,9 @@ fn simple_union_query_transform() {
     let metadata = &MetadataMock::new();
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let plan = ast.to_ir(metadata).unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
     assert_eq!(
         format!(
@@ -109,9 +109,9 @@ WHERE ("identification_number" = 1
     let metadata = &MetadataMock::new();
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let plan = ast.to_ir(metadata).unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
 
     assert_eq!(
@@ -139,9 +139,9 @@ fn sub_query_in_selection() {
     let metadata = &MetadataMock::new();
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let plan = ast.to_ir(metadata).unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
 
     assert_eq!(
@@ -166,9 +166,9 @@ fn inner_join() {
     let metadata = &MetadataMock::new();
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let plan = ast.to_ir(metadata).unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
 
     assert_eq!(
@@ -215,9 +215,9 @@ fn simple_query_with_unquoted_aliases() {
     let metadata = &MetadataMock::new();
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let plan = ast.to_ir(metadata).unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
 
     assert_eq!(
@@ -256,9 +256,9 @@ fn inner_join_1() {
     let metadata = &MetadataMock::new();
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let plan = ast.to_ir(metadata).unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
 
     assert_eq!(
diff --git a/src/ir/transformation/bool_in/tests.rs b/src/ir/transformation/bool_in/tests.rs
index 1b327495a6..0b415c6889 100644
--- a/src/ir/transformation/bool_in/tests.rs
+++ b/src/ir/transformation/bool_in/tests.rs
@@ -12,9 +12,9 @@ fn bool_in1() {
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let mut plan = ast.to_ir(metadata).unwrap();
     plan.replace_in_operator().unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
     assert_eq!(
         format!(
@@ -35,9 +35,9 @@ fn bool_in2() {
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let mut plan = ast.to_ir(metadata).unwrap();
     plan.replace_in_operator().unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
     assert_eq!(
         format!(
@@ -57,9 +57,9 @@ fn bool_in3() {
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let mut plan = ast.to_ir(metadata).unwrap();
     plan.replace_in_operator().unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
     assert_eq!(
         format!(
diff --git a/src/ir/transformation/dnf.rs b/src/ir/transformation/dnf.rs
index c3b1ee1702..6e112fc7ee 100644
--- a/src/ir/transformation/dnf.rs
+++ b/src/ir/transformation/dnf.rs
@@ -77,7 +77,7 @@ use std::collections::VecDeque;
 
 /// A chain of the trivalents (boolean or NULL expressions) concatenated by AND.
 #[derive(Clone, Debug)]
-struct Chain {
+pub struct Chain {
     nodes: VecDeque<usize>,
 }
 
@@ -144,6 +144,11 @@ impl Chain {
             top_id.ok_or_else(|| QueryPlannerError::CustomError("Empty chain".into()))?;
         Ok(new_top_id)
     }
+
+    /// Return a mutable reference to the chain nodes.
+    pub fn get_mut_nodes(&mut self) -> &mut VecDeque<usize> {
+        &mut self.nodes
+    }
 }
 
 fn call_expr_tree_to_dnf(plan: &mut Plan, top_id: usize) -> Result<usize, QueryPlannerError> {
@@ -151,14 +156,12 @@ fn call_expr_tree_to_dnf(plan: &mut Plan, top_id: usize) -> Result<usize, QueryP
 }
 
 impl Plan {
-    /// Convert an expression tree of trivalent nodes to a disjunctive normal form (DNF).
+    /// Get the DNF "AND" chains from the expression tree.
     ///
     /// # Errors
     /// - If the expression tree is not a trivalent expression.
     /// - Failed to append node to the AND chain.
-    /// - Failed to convert the AND chain to a new expression tree.
-    /// - Failed to concatenate the AND expression trees to the OR tree.
-    pub fn expr_tree_to_dnf(&mut self, top_id: usize) -> Result<usize, QueryPlannerError> {
+    pub fn get_dnf_chains(&self, top_id: usize) -> Result<VecDeque<Chain>, QueryPlannerError> {
         let mut result: VecDeque<Chain> = VecDeque::new();
         let mut stack: Vec<Chain> = Vec::new();
 
@@ -199,6 +202,18 @@ impl Plan {
             stack.push(chain);
         }
 
+        Ok(result)
+    }
+
+    /// Convert an expression tree of trivalent nodes to a disjunctive normal form (DNF).
+    ///
+    /// # Errors
+    /// - Failed to retrieve DNF chains.
+    /// - Failed to convert the AND chain to a new expression tree.
+    /// - Failed to concatenate the AND expression trees to the OR tree.
+    pub fn expr_tree_to_dnf(&mut self, top_id: usize) -> Result<usize, QueryPlannerError> {
+        let mut result = self.get_dnf_chains(top_id)?;
+
         let mut new_top_id: Option<usize> = None;
         while let Some(mut chain) = result.pop_front() {
             let ir_chain_top = chain.as_plan(self)?;
diff --git a/src/ir/transformation/dnf/tests.rs b/src/ir/transformation/dnf/tests.rs
index ec46970c05..5a5e230b2b 100644
--- a/src/ir/transformation/dnf/tests.rs
+++ b/src/ir/transformation/dnf/tests.rs
@@ -13,9 +13,9 @@ fn dnf1() {
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let mut plan = ast.to_ir(metadata).unwrap();
     plan.set_dnf().unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
     assert_eq!(
         format!(
@@ -37,9 +37,9 @@ fn dnf2() {
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let mut plan = ast.to_ir(metadata).unwrap();
     plan.set_dnf().unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
     assert_eq!(
         format!(
@@ -61,9 +61,9 @@ fn dnf3() {
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let mut plan = ast.to_ir(metadata).unwrap();
     plan.set_dnf().unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
     assert_eq!(
         format!(
@@ -84,9 +84,9 @@ fn dnf4() {
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let mut plan = ast.to_ir(metadata).unwrap();
     plan.set_dnf().unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
     assert_eq!(
         format!(
@@ -107,9 +107,9 @@ fn dnf5() {
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let mut plan = ast.to_ir(metadata).unwrap();
     plan.set_dnf().unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
     assert_eq!(
         format!(
diff --git a/src/ir/transformation/equality_propagation.rs b/src/ir/transformation/equality_propagation.rs
index 910229acb2..4836ca20e0 100644
--- a/src/ir/transformation/equality_propagation.rs
+++ b/src/ir/transformation/equality_propagation.rs
@@ -324,9 +324,6 @@ impl Nodes {
                 }
             }
         }
-        if tops.is_empty() {
-            return Err(QueryPlannerError::RedundantTransformation);
-        }
         Ok(tops)
     }
 
diff --git a/src/ir/transformation/merge_tuples/tests.rs b/src/ir/transformation/merge_tuples/tests.rs
index 8e8e1d8a2f..1ec3f7c0e8 100644
--- a/src/ir/transformation/merge_tuples/tests.rs
+++ b/src/ir/transformation/merge_tuples/tests.rs
@@ -12,9 +12,9 @@ fn merge_tuples1() {
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let mut plan = ast.to_ir(metadata).unwrap();
     plan.merge_tuples().unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
     assert_eq!(
         format!(
@@ -36,9 +36,9 @@ fn merge_tuples2() {
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let mut plan = ast.to_ir(metadata).unwrap();
     plan.merge_tuples().unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
     assert_eq!(
         format!(
@@ -59,9 +59,9 @@ fn merge_tuples3() {
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let mut plan = ast.to_ir(metadata).unwrap();
     plan.merge_tuples().unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
     assert_eq!(
         format!("{}", r#"SELECT "t"."a" as "a" FROM "t" WHERE true"#,),
@@ -77,9 +77,9 @@ fn merge_tuples4() {
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let mut plan = ast.to_ir(metadata).unwrap();
     plan.merge_tuples().unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
     assert_eq!(
         format!(
@@ -98,9 +98,9 @@ fn merge_tuples5() {
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let mut plan = ast.to_ir(metadata).unwrap();
     plan.merge_tuples().unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
     assert_eq!(
         format!(
diff --git a/src/ir/transformation/split_columns/tests.rs b/src/ir/transformation/split_columns/tests.rs
index 26787ab279..040b4e0e7c 100644
--- a/src/ir/transformation/split_columns/tests.rs
+++ b/src/ir/transformation/split_columns/tests.rs
@@ -12,9 +12,9 @@ fn split_columns1() {
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let mut plan = ast.to_ir(metadata).unwrap();
     plan.split_columns().unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
     assert_eq!(
         format!(
@@ -33,9 +33,9 @@ fn split_columns2() {
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let mut plan = ast.to_ir(metadata).unwrap();
     plan.split_columns().unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
     assert_eq!(
         format!(
@@ -73,9 +73,9 @@ fn split_columns4() {
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let mut plan = ast.to_ir(metadata).unwrap();
     plan.split_columns().unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
     assert_eq!(
         format!(
@@ -94,9 +94,9 @@ fn split_columns5() {
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let mut plan = ast.to_ir(metadata).unwrap();
     plan.split_columns().unwrap();
-    let ex_plan = ExecutionPlan::from(&plan);
+    let ex_plan = ExecutionPlan::from(plan);
 
-    let top_id = plan.get_top().unwrap();
+    let top_id = ex_plan.get_ir_plan().get_top().unwrap();
     let sql = ex_plan.subtree_as_sql(top_id).unwrap();
     assert_eq!(
         format!(
diff --git a/src/ir/value.rs b/src/ir/value.rs
index ce525ae981..2ddccbf14e 100644
--- a/src/ir/value.rs
+++ b/src/ir/value.rs
@@ -107,6 +107,17 @@ impl Value {
     }
 }
 
+impl From<Value> for String {
+    fn from(v: Value) -> Self {
+        match v {
+            Value::Boolean(b) => b.to_string(),
+            Value::Null => "NULL".to_string(),
+            Value::Number(n) => n.to_string(),
+            Value::String(s) => s,
+        }
+    }
+}
+
 impl From<Trivalent> for Value {
     fn from(f: Trivalent) -> Self {
         match f {
diff --git a/src/parser.rs b/src/parser.rs
index 0f2ee65fee..12041bbc01 100644
--- a/src/parser.rs
+++ b/src/parser.rs
@@ -90,10 +90,6 @@ pub extern "C" fn execute_query(ctx: FunctionCtx, args: FunctionArgs) -> c_int {
             }
         };
 
-        if let Err(e) = query.optimize() {
-            return tarantool::set_error!(TarantoolErrorCode::ProcC, "{}", e.to_string());
-        }
-
         match query.exec() {
             Ok(q) => {
                 ctx.return_mp(&q).unwrap();
-- 
GitLab