From 56933ca14e3d105b12072f758dc5f31d28b161e6 Mon Sep 17 00:00:00 2001
From: Arseniy Volynets <a.volynets@picodata.io>
Date: Tue, 1 Oct 2024 23:06:53 +0300
Subject: [PATCH] feat: show buckets estimation in explain

- Add new line in explain reporting on which buckets query will
  be executed.
- For queries consisting of a single subtree we can say exactly on
  which buckets it will be executed, for queries with more subtrees
  (with motions), we provide an upper bound of total buckets used
  in the query. Upper bound is computed by merging buckets from the
  leaf subtrees.
- In case for DML query with non-local motion we can't provide an
  upper bound, and print 'buckets: unknown'

Examples:
```
explain select a from t
->
projection ("t"."a"::integer -> "a")
    scan "t"
execution options:
    vdbe_max_steps = 45000
    vtable_max_rows = 5000
buckets = [1-3000]

explain select t.a from t join t as t2
on t.a = t2.b
->
projection ("t"."a"::integer -> "a")
  ...
execution options:
    vdbe_max_steps = 45000
    vtable_max_rows = 5000
buckets <= [1-3000]

explain select id from _pico_table
->
projection ("_pico_table"."id"::unsigned -> "id")
    scan "_pico_table"
execution options:
    vdbe_max_steps = 45000
    vtable_max_rows = 5000
buckets = any

explain insert into t values (1, 2)
->
insert "t" on conflict: fail
motion [policy: segment([ref("COLUMN_1")])]
    values
        value row (...)
execution options:
    vdbe_max_steps = 45000
    vtable_max_rows = 5000
buckets = unknown
```
---
 .../test/integration/explain_test.lua         |  12 +
 .../test/integration/left_outer_join_test.lua |   5 +
 sbroad-core/src/executor.rs                   |  11 +-
 sbroad-core/src/executor/tests/frontend.rs    |  10 +-
 sbroad-core/src/ir/explain.rs                 |  84 ++++-
 sbroad-core/src/ir/explain/execution_info.rs  | 200 ++++++++++++
 sbroad-core/src/ir/explain/tests.rs           |  25 +-
 .../src/ir/explain/tests/query_explain.rs     | 302 ++++++++++++++++++
 sbroad-core/src/ir/node/relational.rs         |  50 +++
 9 files changed, 690 insertions(+), 9 deletions(-)
 create mode 100644 sbroad-core/src/ir/explain/execution_info.rs
 create mode 100644 sbroad-core/src/ir/explain/tests/query_explain.rs

diff --git a/sbroad-cartridge/test_app/test/integration/explain_test.lua b/sbroad-cartridge/test_app/test/integration/explain_test.lua
index 8f198b80e..ddc3baf89 100644
--- a/sbroad-cartridge/test_app/test/integration/explain_test.lua
+++ b/sbroad-cartridge/test_app/test/integration/explain_test.lua
@@ -120,6 +120,7 @@ g.test_motion_explain = function()
             "execution options:",
             "    vdbe_max_steps = 45000",
             "    vtable_max_rows = 5000",
+            "buckets = [1-30000]"
         }
     )
 end
@@ -160,6 +161,7 @@ WHERE "t3"."name" = '123']], {} })
             "execution options:",
             "    vdbe_max_steps = 45000",
             "    vtable_max_rows = 5000",
+            "buckets = unknown"
         }
     )
 end
@@ -192,6 +194,7 @@ g.test_valid_explain = function()
             "execution options:",
             "    vdbe_max_steps = 45000",
             "    vtable_max_rows = 5000",
+            "buckets = [3940]"
         }
     )
 end
@@ -214,6 +217,7 @@ g.test_explain_arithmetic_selection = function()
             "execution options:",
             "    vdbe_max_steps = 45000",
             "    vtable_max_rows = 5000",
+            "buckets = [1-30000]"
         }
     )
 
@@ -230,6 +234,7 @@ g.test_explain_arithmetic_selection = function()
             "execution options:",
             "    vdbe_max_steps = 45000",
             "    vtable_max_rows = 5000",
+            "buckets = [1-30000]"
         }
     )
 
@@ -269,6 +274,7 @@ WHERE "t3"."id" = 2
             "execution options:",
             "    vdbe_max_steps = 45000",
             "    vtable_max_rows = 5000",
+            "buckets = [1-30000]"
         }
     )
 
@@ -308,6 +314,7 @@ WHERE "t3"."id" = 2
             "execution options:",
             "    vdbe_max_steps = 45000",
             "    vtable_max_rows = 5000",
+            "buckets = [1-30000]"
         }
     )
 end
@@ -329,6 +336,7 @@ g.test_explain_arithmetic_projection = function()
             "execution options:",
             "    vdbe_max_steps = 45000",
             "    vtable_max_rows = 5000",
+            "buckets = [1-30000]"
         }
     )
 
@@ -345,6 +353,7 @@ g.test_explain_arithmetic_projection = function()
             "execution options:",
             "    vdbe_max_steps = 45000",
             "    vtable_max_rows = 5000",
+            "buckets = [1-30000]"
         }
     )
 
@@ -361,6 +370,7 @@ g.test_explain_arithmetic_projection = function()
             "execution options:",
             "    vdbe_max_steps = 45000",
             "    vtable_max_rows = 5000",
+            "buckets = [1-30000]"
         }
     )
 
@@ -377,6 +387,7 @@ g.test_explain_arithmetic_projection = function()
             "execution options:",
             "    vdbe_max_steps = 45000",
             "    vtable_max_rows = 5000",
+            "buckets = [1-30000]"
         }
     )
 
@@ -392,6 +403,7 @@ g.test_explain_arithmetic_projection = function()
             "execution options:",
             "    vdbe_max_steps = 45000",
             "    vtable_max_rows = 5000",
+            "buckets = [1-30000]"
         }
     )
 end
diff --git a/sbroad-cartridge/test_app/test/integration/left_outer_join_test.lua b/sbroad-cartridge/test_app/test/integration/left_outer_join_test.lua
index 4bb9135f8..b541915f1 100644
--- a/sbroad-cartridge/test_app/test/integration/left_outer_join_test.lua
+++ b/sbroad-cartridge/test_app/test/integration/left_outer_join_test.lua
@@ -212,6 +212,7 @@ left_join.test_left_join_local_execution = function()
         "execution options:",
         "    vdbe_max_steps = 45000",
         "    vtable_max_rows = 5000",
+        "buckets = [1-30000]"
     })
 end
 
@@ -254,6 +255,7 @@ left_join.test_inner_segment_motion = function()
         "execution options:",
         "    vdbe_max_steps = 45000",
         "    vtable_max_rows = 5000",
+        "buckets = unknown"
     })
 end
 
@@ -295,6 +297,7 @@ left_join.test_inner_full_motion = function()
         "execution options:",
         "    vdbe_max_steps = 45000",
         "    vtable_max_rows = 5000",
+        "buckets = [1-30000]"
     })
 end
 
@@ -419,6 +422,7 @@ left_join.test_sq_with_full_motion = function()
         "execution options:",
         "    vdbe_max_steps = 45000",
         "    vtable_max_rows = 5000",
+        "buckets = [1-30000]"
     })
 end
 
@@ -467,6 +471,7 @@ left_join.test_sq_with_segment_motion = function()
         "execution options:",
         "    vdbe_max_steps = 45000",
         "    vtable_max_rows = 5000",
+        "buckets = unknown"
     })
 end
 
diff --git a/sbroad-core/src/executor.rs b/sbroad-core/src/executor.rs
index 12c3efc59..c8f4e10ca 100644
--- a/sbroad-core/src/executor.rs
+++ b/sbroad-core/src/executor.rs
@@ -266,7 +266,7 @@ where
     ///
     /// # Errors
     /// - Failed to build explain
-    pub fn produce_explain(&self) -> Result<Box<dyn Any>, SbroadError> {
+    pub fn produce_explain(&mut self) -> Result<Box<dyn Any>, SbroadError> {
         self.coordinator.explain_format(self.to_explain()?)
     }
 
@@ -318,8 +318,8 @@ where
     ///
     /// # Errors
     /// - Failed to build explain
-    pub fn to_explain(&self) -> Result<SmolStr, SbroadError> {
-        self.exec_plan.get_ir_plan().as_explain()
+    pub fn to_explain(&mut self) -> Result<SmolStr, SbroadError> {
+        self.as_explain()
     }
 
     /// Checks that query is explain and have not to be executed
@@ -363,6 +363,11 @@ where
             .unwrap()
     }
 
+    #[must_use]
+    pub fn get_buckets(&self, output_id: NodeId) -> Option<&Buckets> {
+        self.bucket_map.get(&output_id)
+    }
+
     /// Checks that query is for plugin.
     ///
     /// # Errors
diff --git a/sbroad-core/src/executor/tests/frontend.rs b/sbroad-core/src/executor/tests/frontend.rs
index 004181802..bb6d4af0a 100644
--- a/sbroad-core/src/executor/tests/frontend.rs
+++ b/sbroad-core/src/executor/tests/frontend.rs
@@ -76,6 +76,7 @@ fn front_explain_select_sql1() {
 execution options:
     vdbe_max_steps = 45000
     vtable_max_rows = 5000
+buckets = [1-10000]
 "#,
     );
 
@@ -96,7 +97,7 @@ fn front_explain_select_sql2() {
     let mut query = Query::new(metadata, sql, vec![]).unwrap();
 
     let expected_explain: SmolStr = format_smolstr!(
-        "{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n",
+        "{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n",
         r#"union all"#,
         r#"    projection ("t"."identification_number"::integer -> "c1", "t"."product_code"::string -> "product_code")"#,
         r#"        scan "hash_testing" -> "t""#,
@@ -105,6 +106,7 @@ fn front_explain_select_sql2() {
         r#"execution options:"#,
         r#"    vdbe_max_steps = 45000"#,
         r#"    vtable_max_rows = 5000"#,
+        r#"buckets = [1-10000]"#,
     );
 
     if let Ok(actual_explain) = query.dispatch().unwrap().downcast::<SmolStr>() {
@@ -124,7 +126,7 @@ fn front_explain_select_sql3() {
     let mut query = Query::new(metadata, sql, vec![]).unwrap();
 
     let expected_explain: SmolStr = format_smolstr!(
-        "{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n",
+        "{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n",
         r#"projection ("q1"."a"::string -> "a")"#,
         r#"    join on ROW("q1"."a"::string) = ROW("q2"."a2"::string)"#,
         r#"        scan "q1""#,
@@ -136,6 +138,7 @@ fn front_explain_select_sql3() {
         r#"execution options:"#,
         r#"    vdbe_max_steps = 45000"#,
         r#"    vtable_max_rows = 5000"#,
+        r#"buckets = [1-10000]"#,
     );
 
     if let Ok(actual_explain) = query.dispatch().unwrap().downcast::<SmolStr>() {
@@ -155,7 +158,7 @@ fn front_explain_select_sql4() {
     let mut query = Query::new(metadata, sql, vec![]).unwrap();
 
     let expected_explain: SmolStr = format_smolstr!(
-        "{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n",
+        "{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n",
         r#"projection ("q2"."a"::string -> "a")"#,
         r#"    join on ROW("q1"."a"::string) = ROW("q2"."a"::string)"#,
         r#"        scan "q1""#,
@@ -167,6 +170,7 @@ fn front_explain_select_sql4() {
         r#"execution options:"#,
         r#"    vdbe_max_steps = 45000"#,
         r#"    vtable_max_rows = 5000"#,
+        r#"buckets = [1-10000]"#,
     );
 
     if let Ok(actual_explain) = query.dispatch().unwrap().downcast::<SmolStr>() {
diff --git a/sbroad-core/src/ir/explain.rs b/sbroad-core/src/ir/explain.rs
index b555e2e35..fbb9c68af 100644
--- a/sbroad-core/src/ir/explain.rs
+++ b/sbroad-core/src/ir/explain.rs
@@ -7,7 +7,11 @@ use serde::Serialize;
 use smol_str::{format_smolstr, SmolStr, ToSmolStr};
 
 use crate::errors::{Entity, SbroadError};
+use crate::executor::bucket::Buckets;
 use crate::executor::engine::helpers::to_user;
+use crate::executor::engine::Router;
+use crate::executor::Query;
+use crate::ir::explain::execution_info::BucketsInfo;
 use crate::ir::expression::cast::Type as CastType;
 use crate::ir::expression::TrimKind;
 use crate::ir::node::{
@@ -1028,7 +1032,7 @@ impl ExplainTreePart {
     }
 }
 
-#[derive(Debug, Serialize, Default)]
+#[derive(Debug, Default)]
 struct FullExplain {
     /// Main sql subtree
     main_query: ExplainTreePart,
@@ -1036,6 +1040,46 @@ struct FullExplain {
     subqueries: Vec<ExplainTreePart>,
     /// Options imposed during query execution
     exec_options: Vec<(OptionKind, Value)>,
+    /// Info related to plan execution
+    buckets_info: Option<BucketsInfo>,
+}
+
+fn buckets_repr(buckets: &Buckets, bucket_count: u64) -> String {
+    match buckets {
+        Buckets::All => format!("[1-{bucket_count}]"),
+        Buckets::Filtered(buckets_set) => 'f: {
+            if buckets_set.is_empty() {
+                break 'f "[]".into();
+            }
+
+            let mut nums: Vec<u64> = buckets_set.iter().copied().collect();
+            nums.sort_unstable();
+
+            let mut ranges = Vec::new();
+            let mut l = 0;
+            for r in 1..nums.len() {
+                if nums[r - 1] + 1 == nums[r] {
+                    continue;
+                }
+                if r - l == 1 {
+                    ranges.push(format!("{}", nums[l]));
+                } else {
+                    ranges.push(format!("{}-{}", nums[l], nums[r - 1]))
+                }
+                l = r;
+            }
+
+            let r = nums.len();
+            if r - l == 1 {
+                ranges.push(format!("{}", nums[r - 1]));
+            } else {
+                ranges.push(format!("{}-{}", nums[l], nums[r - 1]))
+            }
+
+            format!("[{}]", ranges.join(","))
+        }
+        Buckets::Any => "any".into(),
+    }
 }
 
 impl Display for FullExplain {
@@ -1052,6 +1096,22 @@ impl Display for FullExplain {
                 writeln!(s, "{:4}{} = {}", "", opt.0, opt.1)?;
             }
         }
+        if let Some(info) = &self.buckets_info {
+            match info {
+                BucketsInfo::Unknown => writeln!(s, "buckets = unknown")?,
+                BucketsInfo::Calculated(calculated) => {
+                    let repr = buckets_repr(&calculated.buckets, calculated.bucket_count);
+                    // For buckets ANY and ALL there is no sense to handle in the
+                    // output the case when bucket count is not exact.
+                    match calculated.buckets {
+                        Buckets::Any | Buckets::All => writeln!(s, "buckets = {repr}",)?,
+                        _ if calculated.is_exact => writeln!(s, "buckets = {repr}",)?,
+                        _ => writeln!(s, "buckets <= {repr}",)?,
+                    }
+                }
+            }
+        }
+
         write!(f, "{s}")
     }
 }
@@ -1275,8 +1335,8 @@ impl FullExplain {
                             MotionPolicy::LocalSegment(MotionKey { targets })
                         }
                     };
-                    let m = Motion::new(p);
 
+                    let m = Motion::new(p);
                     Some(ExplainNode::Motion(m))
                 }
                 Relational::Join(Join {
@@ -1365,6 +1425,7 @@ impl FullExplain {
                     Some(ExplainNode::Limit(*limit))
                 }
             };
+
             stack.push(current_node);
         }
         result.main_query = stack
@@ -1372,6 +1433,10 @@ impl FullExplain {
             .ok_or_else(|| SbroadError::NotFound(Entity::Node, "that is explain top".into()))?;
         Ok(result)
     }
+
+    fn add_execution_info(&mut self, info: BucketsInfo) {
+        self.buckets_info = Some(info);
+    }
 }
 
 impl Plan {
@@ -1387,5 +1452,20 @@ impl Plan {
     }
 }
 
+impl<'a, C: Router> Query<'a, C> {
+    pub fn as_explain(&mut self) -> Result<SmolStr, SbroadError> {
+        let plan = self.get_exec_plan().get_ir_plan();
+        let top_id = plan.get_top()?;
+        let mut explain = FullExplain::new(plan, top_id)?;
+
+        let info = BucketsInfo::new_from_query(self)?;
+        explain.add_execution_info(info);
+
+        Ok(explain.to_smolstr())
+    }
+}
+
 #[cfg(test)]
 mod tests;
+
+mod execution_info;
diff --git a/sbroad-core/src/ir/explain/execution_info.rs b/sbroad-core/src/ir/explain/execution_info.rs
new file mode 100644
index 000000000..a6f6324ab
--- /dev/null
+++ b/sbroad-core/src/ir/explain/execution_info.rs
@@ -0,0 +1,200 @@
+use ahash::AHashSet;
+
+use crate::{
+    errors::SbroadError,
+    executor::{
+        bucket::Buckets,
+        engine::{Router, Vshard},
+        Query,
+    },
+    ir::{
+        node::{relational::Relational, Motion, Node, NodeId},
+        transformation::redistribution::MotionPolicy,
+        tree::traversal::{LevelNode, PostOrder, PostOrderWithFilter, REL_CAPACITY},
+        Plan,
+    },
+};
+
+#[derive(Debug)]
+pub struct CalculatedBuckets {
+    /// Estimated buckets on which whole plan will be executed.
+    pub buckets: Buckets,
+    /// True if estimation is correct, otherwise
+    /// it means this is an upper bound.
+    pub is_exact: bool,
+    /// Total number of buckets in cluster
+    pub bucket_count: u64,
+}
+
+#[derive(Debug)]
+pub enum BucketsInfo {
+    /// We can't calculate buckets for this query,
+    /// see `can_estimate_buckets`
+    Unknown,
+    Calculated(CalculatedBuckets),
+}
+
+impl BucketsInfo {
+    pub fn new_calculated(buckets: Buckets, is_exact: bool, bucket_count: u64) -> Self {
+        BucketsInfo::Calculated(CalculatedBuckets {
+            buckets,
+            is_exact,
+            bucket_count,
+        })
+    }
+
+    /// Estimate on which buckets query will be executed.
+    /// If query consists only of single subtree we
+    /// can predict buckets precisely. If there are multiple
+    /// subtrees we calculate the upper bound:
+    ///
+    /// We gather all subtrees from plan that don't have
+    /// non-local motions and call `bucket_discovery` for
+    /// each such node, then we merge (disjunct) all buckets
+    /// for upper bound estimate.
+    ///
+    /// In case we can't compute buckets for this query, we
+    /// `BucketsInfo::Unknown` variant.
+    pub fn new_from_query<R: Router>(query: &mut Query<'_, R>) -> Result<Self, SbroadError> {
+        let ir = query.get_exec_plan().get_ir_plan();
+        if !Self::can_estimate_buckets(ir)? {
+            return Ok(BucketsInfo::Unknown);
+        }
+
+        let top_id = ir.get_top()?;
+
+        let mut dfs_tree = PostOrder::with_capacity(|node| ir.nodes.rel_iter(node), REL_CAPACITY);
+        // Stores previously computed results for each
+        // child of the current node: weather the child
+        // has non-local motion in its subtree.
+        let mut stack: Vec<(NodeId, bool)> = Vec::new();
+        // Ids of nodes that don't have non-local motions in their subtrees.
+        // We can safely call `bucket_discovery` on such nodes. For buckets
+        // estimation we take union of all buckets produced by those nodes:
+        //
+        // m - non-local motion, n - any other kind of node
+        //
+        //               n1
+        //              /  \
+        //             m1  n2
+        //             |
+        //             n3
+        //            /  \
+        //           m4  m5
+        //           |   |
+        //           n4  n5
+        //
+        // For such subtree, we would have: {n4, n5, n2}
+        // For single subtree without motions, we would have only root node.
+        let mut without_motions_ids: AHashSet<NodeId> = AHashSet::new();
+        // Ids of children of current node, that don't have non-local motions
+        // in their subtree. If current node is a non-local motion or
+        // some children have such motions in their subtrees, then
+        // such children are to be used for buckets estimation.
+        let mut cur_children_without_motions: Vec<NodeId> = Vec::new();
+        for LevelNode(_, id) in dfs_tree.iter(top_id) {
+            let rel = ir.get_relation_node(id)?;
+
+            // true if this subtree has non-local motion
+            let mut has_non_local_motion = false;
+            for _ in 0..rel.children().len() {
+                let (child_id, child_value) = stack.pop().expect("rel iter visits all children");
+
+                if !child_value {
+                    cur_children_without_motions.push(child_id);
+                }
+
+                has_non_local_motion = has_non_local_motion || child_value;
+            }
+
+            if rel.is_non_local_motion() {
+                has_non_local_motion = true;
+            }
+            if has_non_local_motion {
+                without_motions_ids.extend(cur_children_without_motions.iter());
+            }
+            cur_children_without_motions.clear();
+
+            if !has_non_local_motion && top_id == id {
+                without_motions_ids.insert(id);
+            }
+
+            stack.push((id, has_non_local_motion));
+        }
+
+        let mut estimated_buckets: Option<Buckets> = None;
+        for child_id in &without_motions_ids {
+            let buckets = query.bucket_discovery(*child_id)?;
+            if let Some(ebuckets) = estimated_buckets.as_mut() {
+                *ebuckets = ebuckets.disjunct(&buckets)?;
+            } else {
+                estimated_buckets = Some(buckets);
+            }
+        }
+
+        let buckets = estimated_buckets.expect("there's at least one subtree");
+
+        // Estimation is exact if we only have single
+        // executable subtree == whole plan
+        let is_exact = without_motions_ids.len() == 1 && without_motions_ids.contains(&top_id);
+        let bucket_count = query
+            .get_coordinator()
+            .get_current_vshard_object()?
+            .bucket_count();
+
+        let buckets_info = BucketsInfo::new_calculated(buckets, is_exact, bucket_count);
+
+        Ok(buckets_info)
+    }
+
+    /// Currently we can't estimate buckets for DML queries with
+    /// non-local motions:
+    /// insert
+    ///    Motion(Segment)
+    ///        Values (...)
+    ///
+    /// If we estimate whole query buckets by buckets of its leaf subtree,
+    /// we get that the whole query will be executed on no more than one
+    /// node (buckets `Any` corresponds to 1 node execution), which is
+    /// wrong.
+    ///
+    /// Also we can't estimate buckets in plans with `Motion(Segment)`
+    /// because after we resharding, we can get any set of buckets.
+    fn can_estimate_buckets(plan: &Plan) -> Result<bool, SbroadError> {
+        let top_id = plan.get_top()?;
+
+        let mut contains_segment_motion = false;
+        let filter = Box::new(|id: NodeId| -> bool {
+            if !contains_segment_motion {
+                contains_segment_motion = matches!(
+                    plan.get_node(id),
+                    Ok(Node::Relational(Relational::Motion(Motion {
+                        policy: MotionPolicy::Segment(_),
+                        ..
+                    })))
+                );
+            }
+            false
+        });
+        let mut dfs = PostOrderWithFilter::with_capacity(|x| plan.nodes.rel_iter(x), 0, filter);
+        dfs.populate_nodes(top_id);
+        drop(dfs);
+
+        if contains_segment_motion {
+            return Ok(false);
+        }
+
+        let node = plan.get_relation_node(top_id)?;
+        if !node.is_dml() {
+            return Ok(true);
+        }
+        if plan.dml_node_table(top_id)?.is_global() {
+            return Ok(true);
+        }
+
+        let child_id = plan.get_relational_child(top_id, 0)?;
+        let child_node = plan.get_relation_node(child_id)?;
+
+        Ok(child_node.is_local_motion())
+    }
+}
diff --git a/sbroad-core/src/ir/explain/tests.rs b/sbroad-core/src/ir/explain/tests.rs
index d65f826b4..0767a6c4b 100644
--- a/sbroad-core/src/ir/explain/tests.rs
+++ b/sbroad-core/src/ir/explain/tests.rs
@@ -1,7 +1,7 @@
 use pretty_assertions::assert_eq;
 
 use super::*;
-use crate::ir::transformation::helpers::sql_to_optimized_ir;
+use crate::{collection, ir::transformation::helpers::sql_to_optimized_ir};
 
 #[test]
 fn simple_query_without_cond_plan() {
@@ -557,8 +557,31 @@ pub(crate) fn explain_check(sql: &str, explain: &str) {
     assert_eq!(explain, explain_tree.to_string());
 }
 
+#[test]
+fn check_buckets_repr() {
+    let bc = 3000;
+    assert_eq!("[1-3000]", buckets_repr(&Buckets::All, bc));
+    assert_eq!("any", buckets_repr(&Buckets::Any, bc));
+    assert_eq!(
+        "[1-3]",
+        buckets_repr(&Buckets::Filtered(collection!(1, 2, 3)), bc)
+    );
+    assert_eq!(
+        "[1-3]",
+        buckets_repr(&Buckets::Filtered(collection!(3, 2, 1)), bc)
+    );
+    assert_eq!(
+        "[1,10-11,21-23]",
+        buckets_repr(&Buckets::Filtered(collection!(1, 10, 11, 23, 22, 21)), bc)
+    );
+    assert_eq!("[]", buckets_repr(&Buckets::Filtered(collection!()), bc));
+}
+
 #[cfg(test)]
 mod concat;
 
 #[cfg(test)]
 mod delete;
+
+#[cfg(test)]
+mod query_explain;
diff --git a/sbroad-core/src/ir/explain/tests/query_explain.rs b/sbroad-core/src/ir/explain/tests/query_explain.rs
new file mode 100644
index 000000000..3e8f0239e
--- /dev/null
+++ b/sbroad-core/src/ir/explain/tests/query_explain.rs
@@ -0,0 +1,302 @@
+use pretty_assertions::assert_eq;
+use smol_str::ToSmolStr;
+
+use crate::executor::{engine::mock::RouterRuntimeMock, Query};
+
+#[test]
+fn test_query_explain_1() {
+    let sql = r#"select 1"#;
+
+    let metadata = &RouterRuntimeMock::new();
+    let mut query = Query::new(metadata, sql, vec![]).unwrap();
+    let expected = r#"projection (1::unsigned -> "col_1")
+execution options:
+    vdbe_max_steps = 45000
+    vtable_max_rows = 5000
+buckets = any
+"#;
+    let actual = query.to_explain().unwrap();
+    assert_eq!(expected.to_smolstr(), actual);
+}
+
+#[test]
+fn test_query_explain_2() {
+    let sql = r#"select e from t2"#;
+
+    let metadata = &RouterRuntimeMock::new();
+    let mut query = Query::new(metadata, sql, vec![]).unwrap();
+    let expected = r#"projection ("t2"."e"::unsigned -> "e")
+    scan "t2"
+execution options:
+    vdbe_max_steps = 45000
+    vtable_max_rows = 5000
+buckets = [1-10000]
+"#;
+    let actual = query.to_explain().unwrap();
+    assert_eq!(expected.to_smolstr(), actual);
+}
+
+#[test]
+fn test_query_explain_3() {
+    let sql = r#"select e from t2 where e = 1 and f = 13"#;
+
+    let metadata = &RouterRuntimeMock::new();
+    let mut query = Query::new(metadata, sql, vec![]).unwrap();
+    let expected = r#"projection ("t2"."e"::unsigned -> "e")
+    selection ROW("t2"."e"::unsigned) = ROW(1::unsigned) and ROW("t2"."f"::unsigned) = ROW(13::unsigned)
+        scan "t2"
+execution options:
+    vdbe_max_steps = 45000
+    vtable_max_rows = 5000
+buckets = [111]
+"#;
+    let actual = query.to_explain().unwrap();
+    assert_eq!(expected.to_smolstr(), actual);
+}
+
+#[test]
+fn test_query_explain_4() {
+    let sql = r#"select count(*) from t2"#;
+
+    let metadata = &RouterRuntimeMock::new();
+    let mut query = Query::new(metadata, sql, vec![]).unwrap();
+    let expected = r#"projection (sum(("count_596"::integer))::decimal -> "col_1")
+    motion [policy: full]
+        projection (count((*::integer))::integer -> "count_596")
+            scan "t2"
+execution options:
+    vdbe_max_steps = 45000
+    vtable_max_rows = 5000
+buckets = [1-10000]
+"#;
+    let actual = query.to_explain().unwrap();
+    assert_eq!(expected.to_smolstr(), actual);
+}
+
+#[test]
+fn test_query_explain_5() {
+    let sql = r#"select a from global_t"#;
+
+    let metadata = &RouterRuntimeMock::new();
+    let mut query = Query::new(metadata, sql, vec![]).unwrap();
+    let expected = r#"projection ("global_t"."a"::integer -> "a")
+    scan "global_t"
+execution options:
+    vdbe_max_steps = 45000
+    vtable_max_rows = 5000
+buckets = any
+"#;
+    let actual = query.to_explain().unwrap();
+    assert_eq!(expected.to_smolstr(), actual);
+}
+
+#[test]
+fn test_query_explain_6() {
+    let sql = r#"insert into t1 values (1, 1)"#;
+
+    let metadata = &RouterRuntimeMock::new();
+    let mut query = Query::new(metadata, sql, vec![]).unwrap();
+    let expected = r#"insert "t1" on conflict: fail
+    motion [policy: segment([ref("COLUMN_1"), ref("COLUMN_2")])]
+        values
+            value row (data=ROW(1::unsigned, 1::unsigned))
+execution options:
+    vdbe_max_steps = 45000
+    vtable_max_rows = 5000
+buckets = unknown
+"#;
+    let actual = query.to_explain().unwrap();
+    assert_eq!(expected.to_smolstr(), actual);
+}
+
+#[test]
+fn test_query_explain_7() {
+    let sql = r#"insert into t1 select a, b from t1"#;
+
+    let metadata = &RouterRuntimeMock::new();
+    let mut query = Query::new(metadata, sql, vec![]).unwrap();
+    let expected = r#"insert "t1" on conflict: fail
+    motion [policy: local segment([ref("a"), ref("b")])]
+        projection ("t1"."a"::string -> "a", "t1"."b"::integer -> "b")
+            scan "t1"
+execution options:
+    vdbe_max_steps = 45000
+    vtable_max_rows = 5000
+buckets = [1-10000]
+"#;
+    let actual = query.to_explain().unwrap();
+    assert_eq!(expected.to_smolstr(), actual);
+}
+
+#[test]
+fn test_query_explain_8() {
+    let sql = r#"insert into global_t values (1, 1)"#;
+
+    let metadata = &RouterRuntimeMock::new();
+    let mut query = Query::new(metadata, sql, vec![]).unwrap();
+    let expected = r#"insert "global_t" on conflict: fail
+    motion [policy: full]
+        values
+            value row (data=ROW(1::unsigned, 1::unsigned))
+execution options:
+    vdbe_max_steps = 45000
+    vtable_max_rows = 5000
+buckets = any
+"#;
+    let actual = query.to_explain().unwrap();
+    assert_eq!(expected.to_smolstr(), actual);
+}
+
+#[test]
+fn test_query_explain_9() {
+    let sql = r#"delete from t2"#;
+
+    let metadata = &RouterRuntimeMock::new();
+    let mut query = Query::new(metadata, sql, vec![]).unwrap();
+    let expected = r#"delete "t2"
+    motion [policy: local]
+        projection ("t2"."g"::unsigned -> "pk_col_0", "t2"."h"::unsigned -> "pk_col_1")
+            scan "t2"
+execution options:
+    vdbe_max_steps = 45000
+    vtable_max_rows = 5000
+buckets = [1-10000]
+"#;
+    let actual = query.to_explain().unwrap();
+    assert_eq!(expected.to_smolstr(), actual);
+}
+
+#[test]
+fn test_query_explain_10() {
+    let sql = r#"update t2 set e = 20 where (e, f) = (10, 10)"#;
+
+    let metadata = &RouterRuntimeMock::new();
+    let mut query = Query::new(metadata, sql, vec![]).unwrap();
+    let expected = r#"update "t2"
+"f" = "col_1"
+"h" = "col_3"
+"e" = "col_0"
+"g" = "col_2"
+    motion [policy: segment([])]
+        projection (20::unsigned -> "col_0", "t2"."f"::unsigned -> "col_1", "t2"."g"::unsigned -> "col_2", "t2"."h"::unsigned -> "col_3", "t2"."e"::unsigned -> "col_4", "t2"."f"::unsigned -> "col_5")
+            selection ROW("t2"."e"::unsigned, "t2"."f"::unsigned) = ROW(10::unsigned, 10::unsigned)
+                scan "t2"
+execution options:
+    vdbe_max_steps = 45000
+    vtable_max_rows = 5000
+buckets = unknown
+"#;
+    let actual = query.to_explain().unwrap();
+    assert_eq!(expected.to_smolstr(), actual);
+}
+
+#[test]
+fn test_query_explain_11() {
+    // This query contains Segment motion
+    // we can't estimate buckets in this case
+
+    let sql = r#"select a, count(b) from
+    (select e, f from t2 where (e, f) = (10, 10))
+    join
+    (select a, b from t1 where (a, b) = (20, 20))
+    on e = a
+    group by a
+"#;
+
+    let metadata = &RouterRuntimeMock::new();
+    let mut query = Query::new(metadata, sql, vec![]).unwrap();
+    let expected = r#"projection ("column_3496"::string -> "a", sum(("count_4196"::integer))::decimal -> "col_1")
+    group by ("column_3496"::string) output: ("column_3496"::string -> "column_3496", "count_4196"::integer -> "count_4196")
+        motion [policy: segment([ref("column_3496")])]
+            projection ("a"::string -> "column_3496", count(("b"::integer))::integer -> "count_4196")
+                group by ("a"::string) output: ("e"::unsigned -> "e", "f"::unsigned -> "f", "a"::string -> "a", "b"::integer -> "b")
+                    join on ROW("e"::unsigned) = ROW("a"::string)
+                        scan
+                            projection ("t2"."e"::unsigned -> "e", "t2"."f"::unsigned -> "f")
+                                selection ROW("t2"."e"::unsigned, "t2"."f"::unsigned) = ROW(10::unsigned, 10::unsigned)
+                                    scan "t2"
+                        motion [policy: full]
+                            scan
+                                projection ("t1"."a"::string -> "a", "t1"."b"::integer -> "b")
+                                    selection ROW("t1"."a"::string, "t1"."b"::integer) = ROW(20::unsigned, 20::unsigned)
+                                        scan "t1"
+execution options:
+    vdbe_max_steps = 45000
+    vtable_max_rows = 5000
+buckets = unknown
+"#;
+    let actual = query.to_explain().unwrap();
+    assert_eq!(expected.to_smolstr(), actual);
+}
+
+#[test]
+fn test_query_explain_12() {
+    // This query does not contain
+    // segment motions and we can estimate it!
+
+    let sql = r#"select a from
+    (select e, f from t2 where (e, f) = (10, 10))
+    join
+    (select a, b from t1 where (a, b) = (20, 20))
+    on e = a
+"#;
+
+    let metadata = &RouterRuntimeMock::new();
+    let mut query = Query::new(metadata, sql, vec![]).unwrap();
+    let expected = r#"projection ("a"::string -> "a")
+    join on ROW("e"::unsigned) = ROW("a"::string)
+        scan
+            projection ("t2"."e"::unsigned -> "e", "t2"."f"::unsigned -> "f")
+                selection ROW("t2"."e"::unsigned, "t2"."f"::unsigned) = ROW(10::unsigned, 10::unsigned)
+                    scan "t2"
+        motion [policy: full]
+            scan
+                projection ("t1"."a"::string -> "a", "t1"."b"::integer -> "b")
+                    selection ROW("t1"."a"::string, "t1"."b"::integer) = ROW(20::unsigned, 20::unsigned)
+                        scan "t1"
+execution options:
+    vdbe_max_steps = 45000
+    vtable_max_rows = 5000
+buckets <= [62,2132]
+"#;
+    let actual = query.to_explain().unwrap();
+    assert_eq!(expected.to_smolstr(), actual);
+}
+
+#[test]
+fn test_query_explain_13() {
+    let sql = r#"insert into global_t select a, b from t1 where (a, b) = (1, 1)"#;
+
+    let metadata = &RouterRuntimeMock::new();
+    let mut query = Query::new(metadata, sql, vec![]).unwrap();
+    let expected = r#"insert "global_t" on conflict: fail
+    motion [policy: full]
+        projection ("t1"."a"::string -> "a", "t1"."b"::integer -> "b")
+            selection ROW("t1"."a"::string, "t1"."b"::integer) = ROW(1::unsigned, 1::unsigned)
+                scan "t1"
+execution options:
+    vdbe_max_steps = 45000
+    vtable_max_rows = 5000
+buckets <= [6691]
+"#;
+    let actual = query.to_explain().unwrap();
+    assert_eq!(expected.to_smolstr(), actual);
+}
+
+#[test]
+fn test_query_explain_14() {
+    let sql = r#"select a, b from t1 where (a, b) = (1, 1) and (a, b) = (2, 2)"#;
+
+    let metadata = &RouterRuntimeMock::new();
+    let mut query = Query::new(metadata, sql, vec![]).unwrap();
+    let expected = r#"projection ("t1"."a"::string -> "a", "t1"."b"::integer -> "b")
+    selection ROW("t1"."a"::string, "t1"."b"::integer) = ROW(1::unsigned, 1::unsigned) and ROW("t1"."a"::string, "t1"."b"::integer) = ROW(2::unsigned, 2::unsigned)
+        scan "t1"
+execution options:
+    vdbe_max_steps = 45000
+    vtable_max_rows = 5000
+buckets = []
+"#;
+    let actual = query.to_explain().unwrap();
+    assert_eq!(expected.to_smolstr(), actual);
+}
diff --git a/sbroad-core/src/ir/node/relational.rs b/sbroad-core/src/ir/node/relational.rs
index f8fb6302c..da91e6c89 100644
--- a/sbroad-core/src/ir/node/relational.rs
+++ b/sbroad-core/src/ir/node/relational.rs
@@ -657,6 +657,56 @@ impl Relational<'_> {
         matches!(self, &Relational::Motion { .. })
     }
 
+    /// Checks that the node is a local motion.
+    /// Such motions are not dispatched by executor
+    #[must_use]
+    pub fn is_local_motion(&self) -> bool {
+        use crate::ir::MotionPolicy;
+
+        matches!(
+            self,
+            &Relational::Motion(Motion {
+                policy: MotionPolicy::Local | MotionPolicy::LocalSegment(_),
+                ..
+            })
+        )
+    }
+
+    #[must_use]
+    pub fn is_non_local_motion(&self) -> bool {
+        self.is_motion() && !self.is_local_motion()
+    }
+
+    /// Return true, if this node serves as a
+    /// data source node: it provides data to
+    /// upper operators and does not have children.
+    #[must_use]
+    pub fn is_data_source(&self) -> bool {
+        match self {
+            Relational::ScanRelation(_) => true,
+            Relational::SelectWithoutScan(SelectWithoutScan { children, .. })
+            | Relational::Values(Values { children, .. }) => children.is_empty(),
+            Relational::ScanCte(_)
+            | Relational::Motion(_)
+            | Relational::Except(_)
+            | Relational::Delete(_)
+            | Relational::Insert(_)
+            | Relational::Intersect(_)
+            | Relational::Update(_)
+            | Relational::Join(_)
+            | Relational::Limit(_)
+            | Relational::Projection(_)
+            | Relational::ScanSubQuery(_)
+            | Relational::Selection(_)
+            | Relational::GroupBy(_)
+            | Relational::Having(_)
+            | Relational::OrderBy(_)
+            | Relational::UnionAll(_)
+            | Relational::Union(_)
+            | Relational::ValuesRow(_) => false,
+        }
+    }
+
     /// Checks that the node is a sub-query or CTE scan.
     #[must_use]
     pub fn is_subquery_or_cte(&self) -> bool {
-- 
GitLab