From fe3389c6937162c7e4eb583583191af372d4fc6f Mon Sep 17 00:00:00 2001
From: Denis Smirnov <sd@picodata.io>
Date: Wed, 16 Nov 2022 17:47:14 +0700
Subject: [PATCH] feat: add function to extract execution plan subtree

---
 sbroad-core/src/backend/sql/tree.rs       |  36 ++---
 sbroad-core/src/executor.rs               |   6 +
 sbroad-core/src/executor/ir.rs            | 160 +++++++++++++++++++++-
 sbroad-core/src/executor/tests.rs         |   5 +-
 sbroad-core/src/executor/tests/subtree.rs |  60 ++++++++
 sbroad-core/src/ir/operator.rs            |  60 +++++++-
 sbroad-core/src/ir/tree/subtree.rs        | 159 +++++++++++++++++++--
 7 files changed, 452 insertions(+), 34 deletions(-)
 create mode 100644 sbroad-core/src/executor/tests/subtree.rs

diff --git a/sbroad-core/src/backend/sql/tree.rs b/sbroad-core/src/backend/sql/tree.rs
index 8c1649d23d..f6c73ca9b5 100644
--- a/sbroad-core/src/backend/sql/tree.rs
+++ b/sbroad-core/src/backend/sql/tree.rs
@@ -611,27 +611,29 @@ impl<'p> SyntaxPlan<'p> {
                 Relational::Motion { .. } => {
                     let vtable = self.plan.get_motion_vtable(id)?;
                     let vtable_alias = vtable.get_alias().map(String::from);
-                    let child_id = self.plan.get_motion_child(id)?;
-                    let child_rel = self.plan.get_ir_plan().get_relation_node(child_id)?;
                     let mut children: Vec<usize> = Vec::new();
-                    if let Relational::ScanSubQuery { .. } = child_rel {
-                        children = Vec::from([
-                            self.nodes.push_syntax_node(SyntaxNode::new_open()),
-                            self.nodes
-                                .push_syntax_node(SyntaxNode::new_vtable(Rc::clone(&vtable))),
-                            self.nodes.push_syntax_node(SyntaxNode::new_close()),
-                        ]);
+                    if let Ok(child_id) = self.plan.get_motion_child(id) {
+                        let child_rel = self.plan.get_ir_plan().get_relation_node(child_id)?;
+                        if let Relational::ScanSubQuery { .. } = child_rel {
+                            children = Vec::from([
+                                self.nodes.push_syntax_node(SyntaxNode::new_open()),
+                                self.nodes
+                                    .push_syntax_node(SyntaxNode::new_vtable(Rc::clone(&vtable))),
+                                self.nodes.push_syntax_node(SyntaxNode::new_close()),
+                            ]);
 
-                        if let Some(name) = vtable_alias {
-                            children.push(self.nodes.push_syntax_node(SyntaxNode::new_alias(name)));
+                            if let Some(name) = vtable_alias {
+                                children
+                                    .push(self.nodes.push_syntax_node(SyntaxNode::new_alias(name)));
+                            }
+                            let sn = SyntaxNode::new_pointer(id, None, children);
+                            return Ok(self.nodes.push_syntax_node(sn));
                         }
-                    } else {
-                        children.push(
-                            self.nodes
-                                .push_syntax_node(SyntaxNode::new_vtable(Rc::clone(&vtable))),
-                        );
                     }
-
+                    children.push(
+                        self.nodes
+                            .push_syntax_node(SyntaxNode::new_vtable(Rc::clone(&vtable))),
+                    );
                     let sn = SyntaxNode::new_pointer(id, None, children);
                     Ok(self.nodes.push_syntax_node(sn))
                 }
diff --git a/sbroad-core/src/executor.rs b/sbroad-core/src/executor.rs
index fe0e3e2033..c3e8abd518 100644
--- a/sbroad-core/src/executor.rs
+++ b/sbroad-core/src/executor.rs
@@ -131,6 +131,12 @@ where
         &self.exec_plan
     }
 
+    /// Get the mutable reference to the execution plan of the query.
+    #[must_use]
+    pub fn get_mut_exec_plan(&mut self) -> &mut ExecutionPlan {
+        &mut self.exec_plan
+    }
+
     /// Get the coordinator runtime of the query.
     #[must_use]
     pub fn get_coordinator(&self) -> &C {
diff --git a/sbroad-core/src/executor/ir.rs b/sbroad-core/src/executor/ir.rs
index a741bbf901..98545cd74e 100644
--- a/sbroad-core/src/executor/ir.rs
+++ b/sbroad-core/src/executor/ir.rs
@@ -1,12 +1,16 @@
 use std::collections::HashMap;
 use std::rc::Rc;
 
+use ahash::AHashMap;
+use traversal::DftPost;
+
 use crate::errors::QueryPlannerError;
 use crate::errors::QueryPlannerError::CustomError;
 use crate::executor::vtable::VirtualTable;
+use crate::ir::expression::Expression;
 use crate::ir::operator::Relational;
 use crate::ir::transformation::redistribution::MotionPolicy;
-use crate::ir::Plan;
+use crate::ir::{Node, Plan};
 
 #[derive(Debug, Clone)]
 pub struct ExecutionPlan {
@@ -185,4 +189,158 @@ impl ExecutionPlan {
 
         Ok(*child_id)
     }
+
+    /// Build a new execution plan from the subtree of the existing execution plan.
+    ///
+    /// # Errors
+    /// - the original execution plan is invalid
+    #[allow(clippy::too_many_lines)]
+    pub fn new_from_subtree(&self, top_id: usize) -> Result<Self, QueryPlannerError> {
+        let mut map: AHashMap<usize, usize> = AHashMap::new();
+        let mut new_vtables: HashMap<usize, Rc<VirtualTable>> = HashMap::new();
+        let mut new_plan = Plan::new();
+        let ir_plan = self.get_ir_plan();
+        let subtree = DftPost::new(&top_id, |node| ir_plan.exec_plan_subtree_iter(node));
+        for (_, node_id) in subtree {
+            let mut node = ir_plan.get_node(*node_id)?.clone();
+            let next_id = new_plan.nodes.next_id();
+            match node {
+                Node::Relational(ref mut rel) => {
+                    if let Relational::Motion { children, .. } = rel {
+                        if let Some(vtable) =
+                            self.get_vtables().map_or_else(|| None, |v| v.get(node_id))
+                        {
+                            new_vtables.insert(next_id, Rc::clone(vtable));
+                        }
+                        *children = Vec::new();
+                    }
+
+                    if let Some(children) = rel.mut_children() {
+                        for child_id in children {
+                            *child_id = *map.get(child_id).ok_or_else(|| {
+                                QueryPlannerError::CustomError(format!(
+                                    "Failed to build an execution plan subtree: could not find child node id {} in the map",
+                                    child_id
+                                ))
+                            })?;
+                        }
+                    }
+
+                    let output = rel.output();
+                    *rel.mut_output() = *map.get(&output).ok_or_else(|| {
+                        QueryPlannerError::CustomError(format!(
+                            "Failed to build an execution plan subtree: could not find output node id {} in the map",
+                            output
+                        ))
+                    })?;
+                    new_plan.replace_parent_in_subtree(rel.output(), None, Some(next_id))?;
+
+                    if let Relational::Selection {
+                        filter: ref mut expr_id,
+                        ..
+                    }
+                    | Relational::InnerJoin {
+                        condition: ref mut expr_id,
+                        ..
+                    } = rel
+                    {
+                        let oldest_expr_id = ir_plan
+                            .undo
+                            .get_oldest(expr_id)
+                            .map_or_else(|| &*expr_id, |id| id);
+                        *expr_id = *map.get(oldest_expr_id).ok_or_else(|| {
+                            QueryPlannerError::CustomError(format!(
+                                "Failed to build an execution plan subtree: could not find filter/condition node id {} in the map",
+                                oldest_expr_id
+                            ))
+                        })?;
+                        new_plan.replace_parent_in_subtree(*expr_id, None, Some(next_id))?;
+                    }
+
+                    if let Relational::ScanRelation { relation, .. } = rel {
+                        let table = ir_plan.relations.as_ref().and_then(|r| r.get(relation)).ok_or_else(|| {
+                            QueryPlannerError::CustomError(format!(
+                                "Failed to build an execution plan subtree: could not find relation {} in the original plan",
+                                relation
+                            ))
+                        })?.clone();
+                        new_plan.add_rel(table);
+                    }
+                }
+                Node::Expression(ref mut expr) => match expr {
+                    Expression::Alias { ref mut child, .. }
+                    | Expression::Cast { ref mut child, .. }
+                    | Expression::Unary { ref mut child, .. } => {
+                        *child = *map.get(child).ok_or_else(|| {
+                                QueryPlannerError::CustomError(format!(
+                                    "Failed to build an execution plan subtree: could not find child node id {} in the map",
+                                    child
+                                ))
+                            })?;
+                    }
+                    Expression::Bool {
+                        ref mut left,
+                        ref mut right,
+                        ..
+                    }
+                    | Expression::Concat {
+                        ref mut left,
+                        ref mut right,
+                        ..
+                    } => {
+                        *left = *map.get(left).ok_or_else(|| {
+                                QueryPlannerError::CustomError(format!(
+                                    "Failed to build an execution plan subtree: could not find left child node id {} in the map",
+                                    left
+                                ))
+                            })?;
+                        *right = *map.get(right).ok_or_else(|| {
+                                QueryPlannerError::CustomError(format!(
+                                    "Failed to build an execution plan subtree: could not find right child node id {} in the map",
+                                    right
+                                ))
+                            })?;
+                    }
+                    Expression::Reference { ref mut parent, .. } => {
+                        // The new parent node id MUST be set while processing the relational nodes.
+                        *parent = None;
+                    }
+                    Expression::Row {
+                        list: ref mut children,
+                        ..
+                    }
+                    | Expression::StableFunction {
+                        ref mut children, ..
+                    } => {
+                        for child in children {
+                            *child = *map.get(child).ok_or_else(|| {
+                                    QueryPlannerError::CustomError(format!(
+                                        "Failed to build an execution plan subtree: could not find child node id {} in the map",
+                                        child
+                                    ))
+                                })?;
+                        }
+                    }
+                    Expression::Constant { .. } => {}
+                },
+                Node::Parameter { .. } => {}
+            }
+            new_plan.nodes.push(node);
+            map.insert(*node_id, next_id);
+            if top_id == *node_id {
+                new_plan.set_top(next_id)?;
+            }
+        }
+
+        let vtables = if new_vtables.is_empty() {
+            None
+        } else {
+            Some(new_vtables)
+        };
+        let new_exec_plan = ExecutionPlan {
+            plan: new_plan,
+            vtables,
+        };
+        Ok(new_exec_plan)
+    }
 }
diff --git a/sbroad-core/src/executor/tests.rs b/sbroad-core/src/executor/tests.rs
index 7554144cf4..8b3dcd0784 100644
--- a/sbroad-core/src/executor/tests.rs
+++ b/sbroad-core/src/executor/tests.rs
@@ -1377,6 +1377,9 @@ mod concat;
 #[cfg(test)]
 mod empty_motion;
 
+#[cfg(test)]
+mod frontend;
+
 #[cfg(test)]
 mod not_in;
 
@@ -1384,4 +1387,4 @@ mod not_in;
 mod not_eq;
 
 #[cfg(test)]
-mod frontend;
+mod subtree;
diff --git a/sbroad-core/src/executor/tests/subtree.rs b/sbroad-core/src/executor/tests/subtree.rs
new file mode 100644
index 0000000000..a2986bd408
--- /dev/null
+++ b/sbroad-core/src/executor/tests/subtree.rs
@@ -0,0 +1,60 @@
+use pretty_assertions::assert_eq;
+
+use crate::backend::sql::tree::{OrderedSyntaxNodes, SyntaxPlan};
+use crate::executor::engine::mock::RouterRuntimeMock;
+use crate::ir::transformation::redistribution::MotionPolicy;
+use crate::ir::tree::Snapshot;
+
+use super::*;
+
+#[test]
+fn exec_plan_subtree_test() {
+    let sql = r#"SELECT "FIRST_NAME" FROM "test_space" where "id" in
+    (SELECT "identification_number" FROM "hash_testing" where "identification_number" > 1)"#;
+    let coordinator = RouterRuntimeMock::new();
+
+    let mut query = Query::new(&coordinator, sql, vec![]).unwrap();
+    let motion_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][0];
+    let mut virtual_table = virtual_table_23();
+    if let MotionPolicy::Segment(key) = get_motion_policy(query.exec_plan.get_ir_plan(), motion_id)
+    {
+        query
+            .reshard_vtable(&mut virtual_table, key, &DataGeneration::None)
+            .unwrap();
+    }
+    let mut vtables: HashMap<usize, Rc<VirtualTable>> = HashMap::new();
+    vtables.insert(motion_id, Rc::new(virtual_table));
+
+    let exec_plan = query.get_mut_exec_plan();
+    exec_plan.set_vtables(vtables);
+    let top_id = exec_plan.get_ir_plan().get_top().unwrap();
+    let motion_child_id = exec_plan.get_motion_subtree_root(motion_id).unwrap();
+
+    // Check sub-query
+    let subplan1 = exec_plan.new_from_subtree(motion_child_id).unwrap();
+    let subplan1_top_id = subplan1.get_ir_plan().get_top().unwrap();
+    let sp = SyntaxPlan::new(&subplan1, subplan1_top_id, Snapshot::Oldest).unwrap();
+    let ordered = OrderedSyntaxNodes::try_from(sp).unwrap();
+    let nodes = ordered.to_syntax_data().unwrap();
+    let sql = subplan1.to_sql(&nodes, &Buckets::All).unwrap();
+    assert_eq!(
+        sql,
+        PatternWithParams::new(
+            r#"SELECT "hash_testing"."identification_number" FROM "hash_testing" WHERE ("hash_testing"."identification_number") > (?)"#.to_string(),
+            vec![Value::from(1_u64)]
+        ));
+
+    // Check main query
+    let subplan2 = exec_plan.new_from_subtree(top_id).unwrap();
+    let subplan2_top_id = subplan2.get_ir_plan().get_top().unwrap();
+    let sp = SyntaxPlan::new(&subplan2, subplan2_top_id, Snapshot::Oldest).unwrap();
+    let ordered = OrderedSyntaxNodes::try_from(sp).unwrap();
+    let nodes = ordered.to_syntax_data().unwrap();
+    let sql = subplan2.to_sql(&nodes, &Buckets::All).unwrap();
+    assert_eq!(
+        sql,
+        PatternWithParams::new(
+            r#"SELECT "test_space"."FIRST_NAME" FROM "test_space" WHERE ("test_space"."id") in (SELECT COLUMN_2 as "identification_number" FROM (VALUES (?),(?)))"#.to_string(),
+            vec![Value::from(2_u64), Value::from(3_u64)]
+        ));
+}
diff --git a/sbroad-core/src/ir/operator.rs b/sbroad-core/src/ir/operator.rs
index b438b6967b..7d5ff3c8ac 100644
--- a/sbroad-core/src/ir/operator.rs
+++ b/sbroad-core/src/ir/operator.rs
@@ -273,7 +273,7 @@ impl Relational {
         ))
     }
 
-    /// Gets output tuple node index in plan node arena.
+    /// Gets an immutable id of the output tuple node of the plan's arena.
     #[must_use]
     pub fn output(&self) -> usize {
         match self {
@@ -291,7 +291,25 @@ impl Relational {
         }
     }
 
-    // Gets a copy of the children nodes.
+    /// Gets an immutable reference to the output tuple node id.
+    #[must_use]
+    pub fn mut_output(&mut self) -> &mut usize {
+        match self {
+            Relational::Except { output, .. }
+            | Relational::InnerJoin { output, .. }
+            | Relational::Insert { output, .. }
+            | Relational::Motion { output, .. }
+            | Relational::Projection { output, .. }
+            | Relational::ScanRelation { output, .. }
+            | Relational::ScanSubQuery { output, .. }
+            | Relational::Selection { output, .. }
+            | Relational::UnionAll { output, .. }
+            | Relational::Values { output, .. }
+            | Relational::ValuesRow { output, .. } => output,
+        }
+    }
+
+    // Gets an immutable reference to the children nodes.
     #[must_use]
     pub fn children(&self) -> Option<&[usize]> {
         match self {
@@ -309,6 +327,44 @@ impl Relational {
         }
     }
 
+    // Gets a mutable reference to the children nodes.
+    #[must_use]
+    pub fn mut_children(&mut self) -> Option<&mut [usize]> {
+        match self {
+            Relational::Except {
+                ref mut children, ..
+            }
+            | Relational::InnerJoin {
+                ref mut children, ..
+            }
+            | Relational::Insert {
+                ref mut children, ..
+            }
+            | Relational::Motion {
+                ref mut children, ..
+            }
+            | Relational::Projection {
+                ref mut children, ..
+            }
+            | Relational::ScanSubQuery {
+                ref mut children, ..
+            }
+            | Relational::Selection {
+                ref mut children, ..
+            }
+            | Relational::UnionAll {
+                ref mut children, ..
+            }
+            | Relational::ValuesRow {
+                ref mut children, ..
+            }
+            | Relational::Values {
+                ref mut children, ..
+            } => Some(children),
+            Relational::ScanRelation { .. } => None,
+        }
+    }
+
     /// Checks if the node is an insertion.
     #[must_use]
     pub fn is_insert(&self) -> bool {
diff --git a/sbroad-core/src/ir/tree/subtree.rs b/sbroad-core/src/ir/tree/subtree.rs
index 17845d0335..7efc8c2d0a 100644
--- a/sbroad-core/src/ir/tree/subtree.rs
+++ b/sbroad-core/src/ir/tree/subtree.rs
@@ -6,7 +6,10 @@ use crate::ir::expression::Expression;
 use crate::ir::operator::Relational;
 use crate::ir::{Node, Nodes, Plan};
 
-trait SubtreePlanIterator<'plan>: PlanTreeIterator<'plan> {}
+trait SubtreePlanIterator<'plan>: PlanTreeIterator<'plan> {
+    fn need_output(&self) -> bool;
+    fn need_motion_subtree(&self) -> bool;
+}
 
 /// Expression and relational nodes iterator.
 #[allow(clippy::module_name_repetitions)]
@@ -37,7 +40,15 @@ impl<'plan> PlanTreeIterator<'plan> for SubtreeIterator<'plan> {
     }
 }
 
-impl<'plan> SubtreePlanIterator<'plan> for SubtreeIterator<'plan> {}
+impl<'plan> SubtreePlanIterator<'plan> for SubtreeIterator<'plan> {
+    fn need_output(&self) -> bool {
+        false
+    }
+
+    fn need_motion_subtree(&self) -> bool {
+        true
+    }
+}
 
 impl<'plan> Iterator for SubtreeIterator<'plan> {
     type Item = &'plan usize;
@@ -89,7 +100,15 @@ impl<'plan> PlanTreeIterator<'plan> for FlashbackSubtreeIterator<'plan> {
     }
 }
 
-impl<'plan> SubtreePlanIterator<'plan> for FlashbackSubtreeIterator<'plan> {}
+impl<'plan> SubtreePlanIterator<'plan> for FlashbackSubtreeIterator<'plan> {
+    fn need_output(&self) -> bool {
+        false
+    }
+
+    fn need_motion_subtree(&self) -> bool {
+        true
+    }
+}
 
 impl<'plan> Iterator for FlashbackSubtreeIterator<'plan> {
     type Item = &'plan usize;
@@ -113,6 +132,66 @@ impl<'plan> Plan {
     }
 }
 
+/// An iterator used while copying and execution plan subtree.
+#[derive(Debug)]
+pub struct ExecPlanSubtreeIterator<'plan> {
+    current: &'plan usize,
+    child: RefCell<usize>,
+    plan: &'plan Plan,
+}
+
+impl<'nodes> TreeIterator<'nodes> for ExecPlanSubtreeIterator<'nodes> {
+    fn get_current(&self) -> &'nodes usize {
+        self.current
+    }
+
+    fn get_child(&self) -> &RefCell<usize> {
+        &self.child
+    }
+
+    fn get_nodes(&self) -> &'nodes Nodes {
+        &self.plan.nodes
+    }
+}
+
+impl<'plan> PlanTreeIterator<'plan> for ExecPlanSubtreeIterator<'plan> {
+    fn get_plan(&self) -> &'plan Plan {
+        self.plan
+    }
+}
+
+impl<'plan> SubtreePlanIterator<'plan> for ExecPlanSubtreeIterator<'plan> {
+    fn need_output(&self) -> bool {
+        true
+    }
+
+    fn need_motion_subtree(&self) -> bool {
+        false
+    }
+}
+
+impl<'plan> Iterator for ExecPlanSubtreeIterator<'plan> {
+    type Item = &'plan usize;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        subtree_next(self, &Snapshot::Oldest)
+    }
+}
+
+impl<'plan> Plan {
+    #[must_use]
+    pub fn exec_plan_subtree_iter(
+        &'plan self,
+        current: &'plan usize,
+    ) -> ExecPlanSubtreeIterator<'plan> {
+        ExecPlanSubtreeIterator {
+            current,
+            child: RefCell::new(0),
+            plan: self,
+        }
+    }
+}
+
 #[allow(clippy::too_many_lines)]
 fn subtree_next<'plan>(
     iter: &mut impl SubtreePlanIterator<'plan>,
@@ -173,7 +252,7 @@ fn subtree_next<'plan>(
                             .get_relational_from_reference_node(*iter.get_current())
                         {
                             match iter.get_plan().get_relation_node(*rel_id) {
-                                Ok(rel_node) if rel_node.is_subquery() => {
+                                Ok(rel_node) if rel_node.is_subquery() || rel_node.is_motion() => {
                                     // Check if the sub-query is an additional one.
                                     let parent = iter.get_plan().get_relation_node(parent_id);
                                     let mut is_additional = false;
@@ -219,16 +298,49 @@ fn subtree_next<'plan>(
                     }
                 }
 
-                Relational::Except { children, .. }
-                | Relational::Insert { children, .. }
-                | Relational::Motion { children, .. }
-                | Relational::ScanSubQuery { children, .. }
-                | Relational::UnionAll { children, .. } => {
+                Relational::Except {
+                    children, output, ..
+                }
+                | Relational::Insert {
+                    children, output, ..
+                }
+                | Relational::ScanSubQuery {
+                    children, output, ..
+                }
+                | Relational::UnionAll {
+                    children, output, ..
+                } => {
                     let step = *iter.get_child().borrow();
                     if step < children.len() {
                         *iter.get_child().borrow_mut() += 1;
                         return children.get(step);
                     }
+                    if iter.need_output() && step == children.len() {
+                        *iter.get_child().borrow_mut() += 1;
+                        return Some(output);
+                    }
+                    None
+                }
+                Relational::Motion {
+                    children, output, ..
+                } => {
+                    if iter.need_motion_subtree() {
+                        let step = *iter.get_child().borrow();
+                        if step < children.len() {
+                            *iter.get_child().borrow_mut() += 1;
+                            return children.get(step);
+                        }
+                        if iter.need_output() && step == children.len() {
+                            *iter.get_child().borrow_mut() += 1;
+                            return Some(output);
+                        }
+                    } else {
+                        let step = *iter.get_child().borrow();
+                        if iter.need_output() && step == 0 {
+                            *iter.get_child().borrow_mut() += 1;
+                            return Some(output);
+                        }
+                    }
                     None
                 }
                 Relational::Values {
@@ -248,7 +360,10 @@ fn subtree_next<'plan>(
                     None
                 }
                 Relational::Selection {
-                    children, filter, ..
+                    children,
+                    filter,
+                    output,
+                    ..
                 } => {
                     let step = *iter.get_child().borrow();
 
@@ -268,19 +383,37 @@ fn subtree_next<'plan>(
                                 );
                             }
                         },
-                        Ordering::Greater => None,
+                        Ordering::Greater => {
+                            if step == 2 && iter.need_output() {
+                                return Some(output);
+                            }
+                            None
+                        }
                     }
                 }
-                Relational::ValuesRow { data, .. } => {
+                Relational::ValuesRow { data, output, .. } => {
                     let step = *iter.get_child().borrow();
 
                     *iter.get_child().borrow_mut() += 1;
                     if step == 0 {
                         return Some(data);
                     }
+                    if iter.need_output() && step == 1 {
+                        return Some(output);
+                    }
+                    None
+                }
+                Relational::ScanRelation { output, .. } => {
+                    if iter.need_output() {
+                        let step = *iter.get_child().borrow();
+
+                        *iter.get_child().borrow_mut() += 1;
+                        if step == 0 {
+                            return Some(output);
+                        }
+                    }
                     None
                 }
-                Relational::ScanRelation { .. } => None,
             },
         };
     }
-- 
GitLab