From eea0c244315c1118192bd0b4c824ba09b05d2c41 Mon Sep 17 00:00:00 2001
From: Denis Smirnov <sd@picodata.io>
Date: Fri, 25 Mar 2022 11:15:18 +0700
Subject: [PATCH] feat: add motion node support for join query

Add support join query to linker, now executor can execute join query with distribution conflict (plan has motion nodes). Extend text in some error
---
 Makefile                                      |   2 +-
 src/executor/engine/cartridge.rs              |  16 +-
 .../engine/cartridge/backend/sql/ir.rs        | 123 +++++++-----
 .../engine/cartridge/backend/sql/tree.rs      |  37 ++--
 src/executor/engine/mock.rs                   |  17 +-
 src/executor/ir.rs                            |  20 +-
 src/executor/result.rs                        |   4 +-
 src/executor/result/tests.rs                  |   4 +-
 src/executor/shard.rs                         |   1 -
 src/executor/tests.rs                         | 181 +++++++++++++++++-
 src/executor/vtable.rs                        |  29 ++-
 src/executor/vtable/tests.rs                  |   6 +-
 src/frontend/sql/ir.rs                        |   1 +
 src/frontend/sql/ir/tests.rs                  |   7 +-
 src/ir/expression.rs                          |   5 +-
 src/ir/expression/tests.rs                    |   7 +-
 src/ir/operator.rs                            |   5 +-
 src/ir/operator/tests.rs                      |   4 +-
 src/ir/relation.rs                            |   8 +-
 src/ir/relation/tests.rs                      |   8 +-
 test_app/test/integration/api_test.lua        |  60 ++++++
 21 files changed, 449 insertions(+), 96 deletions(-)

diff --git a/Makefile b/Makefile
index d1f6d309cf..9c43952cca 100644
--- a/Makefile
+++ b/Makefile
@@ -14,7 +14,7 @@ build:
 	cargo build --release
 
 integration_test_app:
-	cd test_app && rm -rf tmp/tarantool.log && TARANTOOL_LOG=tmp/tarantool.log ./.rocks/bin/luatest --coverage -v test/
+	cd test_app && rm -rf tmp/tarantool.log && TARANTOOL_LOG_LEVEL=7 TARANTOOL_LOG=tmp/tarantool.log ./.rocks/bin/luatest --coverage -v test/
 
 test:
 	cargo test
diff --git a/src/executor/engine/cartridge.rs b/src/executor/engine/cartridge.rs
index 1591d995a7..2d2b5fe233 100644
--- a/src/executor/engine/cartridge.rs
+++ b/src/executor/engine/cartridge.rs
@@ -68,6 +68,14 @@ impl Engine for Runtime {
         let mut result = BoxExecuteFormat::new();
         let sql = plan.subtree_as_sql(top_id)?;
 
+        say(
+            SayLevel::Debug,
+            file!(),
+            line!().try_into().unwrap_or(0),
+            Option::from("exec"),
+            &format!("exec query: {:?}", sql),
+        );
+
         if let Some(shard_keys) = plan.discovery(top_id)? {
             // sending query to nodes
             for shard in shard_keys {
@@ -92,7 +100,13 @@ impl Engine for Runtime {
         let top = &plan.get_motion_subtree_root(motion_node_id)?;
 
         let result = self.exec(plan, *top)?;
-        return result.as_virtual_table(&format!("motion_{}", motion_node_id));
+        let mut vtable = result.as_virtual_table()?;
+
+        if let Some(name) = &plan.get_motion_alias(motion_node_id)? {
+            vtable.set_alias(name)?;
+        }
+
+        Ok(vtable)
     }
 
     /// Calculation ``bucket_id`` function
diff --git a/src/executor/engine/cartridge/backend/sql/ir.rs b/src/executor/engine/cartridge/backend/sql/ir.rs
index a1a6ad7825..0d9fc14be6 100644
--- a/src/executor/engine/cartridge/backend/sql/ir.rs
+++ b/src/executor/engine/cartridge/backend/sql/ir.rs
@@ -103,6 +103,14 @@ impl<'e> ExecutionPlan<'e> {
         };
 
         let ir_plan = self.get_ir_plan();
+
+        // The starting value of the counter for generating anonymous column names.
+        // It emulates Tarantool parser (generated by lemon parser generator).
+        // It traverses the parse tree bottom-up from left to right and increments
+        // the global counter with the columns that need an auto-generated name.
+        // As a result each such column would be named like "COLUMN_%d", where "%d"
+        // is the new global counter value.
+        let mut anonymous_col_idx_base = 0;
         for (id, data) in nodes.iter().enumerate() {
             if let Some(' ' | '(') = sql.chars().last() {
             } else if need_delim_after(id) {
@@ -119,64 +127,75 @@ impl<'e> ExecutionPlan<'e> {
                 SyntaxData::From => sql.push_str("FROM"),
                 SyntaxData::Operator(s) => sql.push_str(s.as_str()),
                 SyntaxData::OpenParenthesis => sql.push('('),
-                SyntaxData::PlanId(id) => match ir_plan.get_node(*id)? {
-                    Node::Relational(rel) => match rel {
-                        Relational::InnerJoin { .. } => sql.push_str("INNER JOIN"),
-                        Relational::Motion { .. } => {
-                            return Err(QueryPlannerError::CustomError(
-                                "Motion nodes can't be converted to SQL.".into(),
-                            ))
-                        }
-                        Relational::Projection { .. } => sql.push_str("SELECT"),
-                        Relational::ScanRelation { relation, .. } => {
-                            sql.push_str(relation);
-                        }
-                        Relational::ScanSubQuery { .. } => {}
-                        Relational::Selection { .. } => sql.push_str("WHERE"),
-                        Relational::UnionAll { .. } => sql.push_str("UNION ALL"),
-                    },
-                    Node::Expression(expr) => match expr {
-                        Expression::Alias { .. }
-                        | Expression::Bool { .. }
-                        | Expression::Row { .. } => {}
-                        Expression::Constant { value, .. } => sql.push_str(&format!("{}", value)),
-                        Expression::Reference { position, .. } => {
-                            let rel_id: usize = ir_plan
-                                .get_relational_from_reference_node(*id)?
-                                .into_iter()
-                                .next()
-                                .ok_or_else(|| {
-                                    QueryPlannerError::CustomError(
-                                        "Reference points to a non-relational node.".into(),
-                                    )
-                                })?;
-                            let rel_node = ir_plan.get_relation_node(rel_id)?;
-                            if let Some(name) = rel_node.scan_name(ir_plan, *position)? {
-                                sql.push_str(&format!(
-                                    "{}.{}",
-                                    name,
-                                    &ir_plan.get_alias_from_reference_node(expr)?
-                                ));
-                            } else {
-                                sql.push_str(&ir_plan.get_alias_from_reference_node(expr)?);
+                SyntaxData::PlanId(id) => {
+                    let node = ir_plan.get_node(*id)?;
+                    match node {
+                        Node::Relational(rel) => match rel {
+                            Relational::InnerJoin { .. } => sql.push_str("INNER JOIN"),
+                            Relational::Projection { .. } => sql.push_str("SELECT"),
+                            Relational::ScanRelation { relation, .. } => {
+                                sql.push_str(relation);
                             }
-                        }
-                    },
-                },
-                SyntaxData::VTable(vtable) => {
-                    let mut tuples = Vec::new();
-                    for tuple in vtable.get_tuples() {
-                        tuples.push(format!(
-                            "({})",
-                            (tuple.iter().map(ToString::to_string)).join(",")
-                        ));
+                            Relational::ScanSubQuery { .. } | Relational::Motion { .. } => {}
+                            Relational::Selection { .. } => sql.push_str("WHERE"),
+                            Relational::UnionAll { .. } => sql.push_str("UNION ALL"),
+                        },
+                        Node::Expression(expr) => match expr {
+                            Expression::Alias { .. }
+                            | Expression::Bool { .. }
+                            | Expression::Row { .. } => {}
+                            Expression::Constant { value, .. } => {
+                                sql.push_str(&format!("{}", value));
+                            }
+                            Expression::Reference { position, .. } => {
+                                let rel_id: usize = ir_plan
+                                    .get_relational_from_reference_node(*id)?
+                                    .into_iter()
+                                    .next()
+                                    .ok_or_else(|| {
+                                        QueryPlannerError::CustomError(
+                                            "Reference points to a non-relational node.".into(),
+                                        )
+                                    })?;
+                                let rel_node = ir_plan.get_relation_node(rel_id)?;
+                                let alias = &ir_plan.get_alias_from_reference_node(expr)?;
+
+                                if let Some(name) = rel_node.scan_name(ir_plan, *position)? {
+                                    sql.push_str(&format!("{}.{}", name, alias));
+                                } else {
+                                    sql.push_str(alias);
+                                }
+                            }
+                        },
                     }
+                }
+                SyntaxData::VTable(vtable) => {
+                    // increase base of global counter of anonymous cols
+                    let cols_count = vtable.get_columns().len();
+                    let rows_count = vtable.get_tuples().len();
+                    anonymous_col_idx_base += cols_count * rows_count - (cols_count - 1);
 
-                    sql.push_str(&format!("VALUES {}", tuples.join(",")));
+                    let columns = vtable
+                        .get_columns()
+                        .iter()
+                        .enumerate()
+                        .map(|(i, c)| {
+                            format!("COLUMN_{} as \"{}\"", anonymous_col_idx_base + i, c.name)
+                        })
+                        .collect::<Vec<String>>()
+                        .join(",");
+
+                    let tuples = vtable
+                        .get_tuples()
+                        .iter()
+                        .map(|t| format!("({})", (t.iter().map(ToString::to_string)).join(",")))
+                        .collect::<Vec<String>>()
+                        .join(",");
+
+                    sql.push_str(&format!("SELECT {} FROM (VALUES {})", columns, tuples));
                 }
             }
         }
-
         Ok(sql)
     }
 }
diff --git a/src/executor/engine/cartridge/backend/sql/tree.rs b/src/executor/engine/cartridge/backend/sql/tree.rs
index 2f79df3968..34fdfab598 100644
--- a/src/executor/engine/cartridge/backend/sql/tree.rs
+++ b/src/executor/engine/cartridge/backend/sql/tree.rs
@@ -394,6 +394,7 @@ where
     #[allow(clippy::too_many_lines)]
     pub fn add_plan_node(&mut self, id: usize) -> Result<usize, QueryPlannerError> {
         let ir_plan = self.plan.get_ir_plan();
+
         match ir_plan.get_node(id)? {
             Node::Relational(rel) => match rel {
                 Relational::InnerJoin {
@@ -494,7 +495,19 @@ where
                     Ok(self.nodes.push_syntax_node(sn))
                 }
                 Relational::Motion { .. } => {
-                    let sn = SyntaxNode::new_pointer(id, None, &[]);
+                    let vtable = self.plan.get_motion_vtable(id)?;
+                    let mut children = Vec::from([
+                        self.nodes.push_syntax_node(SyntaxNode::new_open()),
+                        self.nodes
+                            .push_syntax_node(SyntaxNode::new_vtable(vtable.clone())),
+                        self.nodes.push_syntax_node(SyntaxNode::new_close()),
+                    ]);
+
+                    if let Some(name) = &vtable.get_alias() {
+                        children.push(self.nodes.push_syntax_node(SyntaxNode::new_alias(name)));
+                    }
+
+                    let sn = SyntaxNode::new_pointer(id, None, &children);
                     Ok(self.nodes.push_syntax_node(sn))
                 }
             },
@@ -515,17 +528,19 @@ where
                     if let Some(motion_id) = ir_plan.get_motion_from_row(id)? {
                         // Replace motion node to virtual table node
                         let vtable = self.plan.get_motion_vtable(motion_id)?;
-                        let sn = SyntaxNode::new_pointer(
-                            id,
-                            None,
-                            &[
-                                self.nodes.push_syntax_node(SyntaxNode::new_open()),
-                                self.nodes.push_syntax_node(SyntaxNode::new_vtable(vtable)),
-                                self.nodes.push_syntax_node(SyntaxNode::new_close()),
-                            ],
-                        );
+                        if vtable.get_alias().is_none() {
+                            let sn = SyntaxNode::new_pointer(
+                                id,
+                                None,
+                                &[
+                                    self.nodes.push_syntax_node(SyntaxNode::new_open()),
+                                    self.nodes.push_syntax_node(SyntaxNode::new_vtable(vtable)),
+                                    self.nodes.push_syntax_node(SyntaxNode::new_close()),
+                                ],
+                            );
 
-                        return Ok(self.nodes.push_syntax_node(sn));
+                            return Ok(self.nodes.push_syntax_node(sn));
+                        }
                     }
 
                     if let Some(sq_id) = ir_plan.get_sub_query_from_row_node(id)? {
diff --git a/src/executor/engine/mock.rs b/src/executor/engine/mock.rs
index dd8948ccab..4280872727 100644
--- a/src/executor/engine/mock.rs
+++ b/src/executor/engine/mock.rs
@@ -7,6 +7,7 @@ use crate::executor::ir::ExecutionPlan;
 use crate::executor::result::{BoxExecuteFormat, Value};
 use crate::executor::vtable::VirtualTable;
 use crate::executor::Metadata;
+use crate::ir::operator::Relational;
 use crate::ir::relation::{Column, Table, Type};
 use crate::ir::value::Value as IrValue;
 
@@ -135,10 +136,20 @@ impl Engine for EngineMock {
 
     fn materialize_motion(
         &self,
-        _plan: &mut ExecutionPlan,
-        _motion_node_id: usize,
+        plan: &mut ExecutionPlan,
+        motion_node_id: usize,
     ) -> Result<VirtualTable, QueryPlannerError> {
-        let mut vtable = VirtualTable::new("test");
+        let sq_id = &plan.get_motion_child(motion_node_id)?;
+
+        let mut vtable = VirtualTable::new();
+
+        if let Relational::ScanSubQuery { alias, .. } =
+            &plan.get_ir_plan().get_relation_node(*sq_id)?
+        {
+            if let Some(name) = alias {
+                vtable.set_alias(name)?;
+            }
+        }
 
         vtable.add_column(Column {
             name: "identification_number".into(),
diff --git a/src/executor/ir.rs b/src/executor/ir.rs
index b47bd2f7c0..e212dbdfe1 100644
--- a/src/executor/ir.rs
+++ b/src/executor/ir.rs
@@ -82,6 +82,21 @@ impl<'e> ExecutionPlan<'e> {
         )))
     }
 
+    /// Get motion alias name
+    ///
+    /// # Errors
+    /// - node is not valid
+    pub fn get_motion_alias(&self, node_id: usize) -> Result<Option<String>, QueryPlannerError> {
+        let sq_id = &self.get_motion_child(node_id)?;
+        if let Relational::ScanSubQuery { alias, .. } =
+            self.get_ir_plan().get_relation_node(*sq_id)?
+        {
+            return Ok(alias.clone());
+        }
+
+        Ok(None)
+    }
+
     /// Get root from motion sub tree
     ///
     /// # Errors
@@ -96,7 +111,10 @@ impl<'e> ExecutionPlan<'e> {
     /// # Errors
     /// - node is not `Relation` type
     /// - node does not contain children
-    fn get_motion_child(&self, node_id: usize) -> Result<usize, QueryPlannerError> {
+    pub(in crate::executor) fn get_motion_child(
+        &self,
+        node_id: usize,
+    ) -> Result<usize, QueryPlannerError> {
         let node = self.get_ir_plan().get_relation_node(node_id)?;
         if !node.is_motion() {
             return Err(CustomError(format!(
diff --git a/src/executor/result.rs b/src/executor/result.rs
index 7910c5ffdd..cefd59a180 100644
--- a/src/executor/result.rs
+++ b/src/executor/result.rs
@@ -111,8 +111,8 @@ impl BoxExecuteFormat {
     /// # Errors
     /// - convert to virtual table error
     #[allow(dead_code)]
-    pub fn as_virtual_table(&self, name: &str) -> Result<VirtualTable, QueryPlannerError> {
-        let mut result = VirtualTable::new(name);
+    pub fn as_virtual_table(&self) -> Result<VirtualTable, QueryPlannerError> {
+        let mut result = VirtualTable::new();
 
         for col in &self.metadata {
             result.add_column(col.clone());
diff --git a/src/executor/result/tests.rs b/src/executor/result/tests.rs
index a96e40fab6..f01efa3361 100644
--- a/src/executor/result/tests.rs
+++ b/src/executor/result/tests.rs
@@ -92,7 +92,7 @@ fn convert_to_vtable() {
         ],
     };
 
-    let mut excepted = VirtualTable::new("test_vtable");
+    let mut excepted = VirtualTable::new();
 
     excepted.add_column(Column {
         name: "id".into(),
@@ -126,5 +126,5 @@ fn convert_to_vtable() {
         IrValue::number_from_str("2.0").unwrap(),
     ]);
 
-    assert_eq!(excepted, r.as_virtual_table("test_vtable").unwrap());
+    assert_eq!(excepted, r.as_virtual_table().unwrap());
 }
diff --git a/src/executor/shard.rs b/src/executor/shard.rs
index e79882173c..f6cf5ba41d 100644
--- a/src/executor/shard.rs
+++ b/src/executor/shard.rs
@@ -96,7 +96,6 @@ impl<'e> ExecutionPlan<'e> {
                     ir_plan.set_distribution(left_id)?;
                 }
                 let left_dist = ir_plan.get_distribution(left_id)?;
-
                 // Gather right constants corresponding to the left keys.
                 if let Distribution::Segment { keys } = left_dist {
                     for key in keys {
diff --git a/src/executor/tests.rs b/src/executor/tests.rs
index 93ac12e51c..e17dc90cd4 100644
--- a/src/executor/tests.rs
+++ b/src/executor/tests.rs
@@ -1,7 +1,9 @@
-use super::*;
+use pretty_assertions::assert_eq;
+
 use crate::executor::engine::mock::EngineMock;
 use crate::executor::result::Value;
-use pretty_assertions::assert_eq;
+
+use super::*;
 
 #[test]
 fn shard_query() {
@@ -104,7 +106,7 @@ fn linker_test() {
                 "{} {} {}",
                 r#"SELECT "test_space"."FIRST_NAME" as "FIRST_NAME""#,
                 r#"FROM "test_space""#,
-                r#"WHERE ("test_space"."id") in (VALUES (2),(3))"#,
+                r#"WHERE ("test_space"."id") in (SELECT COLUMN_2 as "identification_number" FROM (VALUES (2),(3)))"#,
             )),
         ],
         vec![
@@ -113,7 +115,7 @@ fn linker_test() {
                 "{} {} {}",
                 r#"SELECT "test_space"."FIRST_NAME" as "FIRST_NAME""#,
                 r#"FROM "test_space""#,
-                r#"WHERE ("test_space"."id") in (VALUES (2),(3))"#,
+                r#"WHERE ("test_space"."id") in (SELECT COLUMN_2 as "identification_number" FROM (VALUES (2),(3)))"#,
             )),
         ],
     ]);
@@ -159,7 +161,7 @@ fn union_linker_test() {
                     r#"FROM "test_space_hist""#,
                     r#"WHERE ("test_space_hist"."sys_op") > (0)"#,
                     r#") as "t1""#,
-                    r#"WHERE ("t1"."id") in (VALUES (2),(3))"#
+                    r#"WHERE ("t1"."id") in (SELECT COLUMN_2 as "identification_number" FROM (VALUES (2),(3)))"#
                 )
             )
         ],
@@ -178,11 +180,178 @@ fn union_linker_test() {
                     r#"FROM "test_space_hist""#,
                     r#"WHERE ("test_space_hist"."sys_op") > (0)"#,
                     r#") as "t1""#,
-                    r#"WHERE ("t1"."id") in (VALUES (2),(3))"#
+                    r#"WHERE ("t1"."id") in (SELECT COLUMN_2 as "identification_number" FROM (VALUES (2),(3)))"#
+                )
+            )
+        ],
+    ]);
+
+    assert_eq!(expected, result)
+}
+
+#[test]
+fn join_linker_test() {
+    let sql = r#"SELECT *
+FROM
+    (SELECT "id", "FIRST_NAME"
+    FROM "test_space"
+    WHERE "sys_op" < 0
+            AND "sysFrom" >= 0
+    UNION ALL
+    SELECT "id", "FIRST_NAME"
+    FROM "test_space_hist"
+    WHERE "sysFrom" <= 0) AS "t3"
+INNER JOIN
+    (SELECT "identification_number"
+    FROM "hash_testing_hist"
+    WHERE "sys_op" > 0
+    UNION ALL
+    SELECT "identification_number"
+    FROM "hash_single_testing_hist"
+    WHERE "sys_op" <= 0) AS "t8"
+    ON "t3"."id" = "t8"."identification_number"
+WHERE "t3"."id" = 1 AND "t8"."identification_number" = 1"#;
+
+    let engine = EngineMock::new();
+
+    let mut query = Query::new(engine, sql).unwrap();
+    query.optimize().unwrap();
+
+    let result = query.exec().unwrap();
+
+    let mut expected = BoxExecuteFormat::new();
+
+    expected.rows.extend(vec![
+        vec![
+            Value::String(String::from("query send to [1] shard")),
+            Value::String(
+                format!(
+                    "{}, {}, {} {}{} {} {} {} {} {} {}{} {} {}{} {} {}",
+                    r#"SELECT "t3"."id" as "id""#,
+                    r#""t3"."FIRST_NAME" as "FIRST_NAME""#,
+                    r#""t8"."identification_number" as "identification_number""#,
+                    r#"FROM ("#,
+                    r#"SELECT "test_space"."id" as "id", "test_space"."FIRST_NAME" as "FIRST_NAME""#,
+                    r#"FROM "test_space""#,
+                    r#"WHERE ("test_space"."sys_op") < (0) and ("test_space"."sysFrom") >= (0)"#,
+                    r#"UNION ALL"#,
+                    r#"SELECT "test_space_hist"."id" as "id", "test_space_hist"."FIRST_NAME" as "FIRST_NAME""#,
+                    r#"FROM "test_space_hist""#,
+                    r#"WHERE ("test_space_hist"."sysFrom") <= (0)"#,
+                    r#") as "t3""#,
+                    r#"INNER JOIN"#,
+                    r#"(SELECT COLUMN_2 as "identification_number" FROM (VALUES (2),(3))"#,
+                    r#") as "t8""#,
+                    r#"ON ("t3"."id") = ("t8"."identification_number")"#,
+                    r#"WHERE ("t3"."id") = (1) and ("t8"."identification_number") = (1)"#
+                )
+            )
+        ],
+        vec![
+            Value::String(String::from("query send to [2] shard")),
+            Value::String(
+                format!(
+                    "{}, {}, {} {}{} {} {} {} {} {} {}{} {} {}{} {} {}",
+                    r#"SELECT "t3"."id" as "id""#,
+                    r#""t3"."FIRST_NAME" as "FIRST_NAME""#,
+                    r#""t8"."identification_number" as "identification_number""#,
+                    r#"FROM ("#,
+                    r#"SELECT "test_space"."id" as "id", "test_space"."FIRST_NAME" as "FIRST_NAME""#,
+                    r#"FROM "test_space""#,
+                    r#"WHERE ("test_space"."sys_op") < (0) and ("test_space"."sysFrom") >= (0)"#,
+                    r#"UNION ALL"#,
+                    r#"SELECT "test_space_hist"."id" as "id", "test_space_hist"."FIRST_NAME" as "FIRST_NAME""#,
+                    r#"FROM "test_space_hist""#,
+                    r#"WHERE ("test_space_hist"."sysFrom") <= (0)"#,
+                    r#") as "t3""#,
+                    r#"INNER JOIN"#,
+                    r#"(SELECT COLUMN_2 as "identification_number" FROM (VALUES (2),(3))"#,
+                    r#") as "t8""#,
+                    r#"ON ("t3"."id") = ("t8"."identification_number")"#,
+                    r#"WHERE ("t3"."id") = (1) and ("t8"."identification_number") = (1)"#
+                )
+            )
+        ],
+        vec![
+            Value::String(String::from("query send to [3] shard")),
+            Value::String(
+                format!(
+                    "{}, {}, {} {}{} {} {} {} {} {} {}{} {} {}{} {} {}",
+                    r#"SELECT "t3"."id" as "id""#,
+                    r#""t3"."FIRST_NAME" as "FIRST_NAME""#,
+                    r#""t8"."identification_number" as "identification_number""#,
+                    r#"FROM ("#,
+                    r#"SELECT "test_space"."id" as "id", "test_space"."FIRST_NAME" as "FIRST_NAME""#,
+                    r#"FROM "test_space""#,
+                    r#"WHERE ("test_space"."sys_op") < (0) and ("test_space"."sysFrom") >= (0)"#,
+                    r#"UNION ALL"#,
+                    r#"SELECT "test_space_hist"."id" as "id", "test_space_hist"."FIRST_NAME" as "FIRST_NAME""#,
+                    r#"FROM "test_space_hist""#,
+                    r#"WHERE ("test_space_hist"."sysFrom") <= (0)"#,
+                    r#") as "t3""#,
+                    r#"INNER JOIN"#,
+                    r#"(SELECT COLUMN_2 as "identification_number" FROM (VALUES (2),(3))"#,
+                    r#") as "t8""#,
+                    r#"ON ("t3"."id") = ("t8"."identification_number")"#,
+                    r#"WHERE ("t3"."id") = (1) and ("t8"."identification_number") = (1)"#
                 )
             )
         ],
     ]);
+    assert_eq!(expected, result)
+}
+
+// select * from "test_1" where "identification_number" in (select COLUMN_2 as "b" from (values (1), (2))) or "identification_number" in (select COLUMN_2 as "c" from (values (3), (4)));
+#[test]
+fn anonymous_col_index_test() {
+    let sql = r#"SELECT * FROM "test_space"
+    WHERE "id" in (SELECT "identification_number" FROM "hash_testing" WHERE "product_units" < 3)
+        OR "id" in (SELECT "identification_number" FROM "hash_testing" WHERE "product_units" > 5)"#;
+
+    let engine = EngineMock::new();
+
+    let mut query = Query::new(engine, sql).unwrap();
+    query.optimize().unwrap();
+
+    let result = query.exec().unwrap();
+
+    let mut expected = BoxExecuteFormat::new();
+    expected.rows.extend(vec![
+        vec![
+            Value::String(String::from("query send to [2] shard")),
+            Value::String(format!(
+                "{} {}, {}, {}, {}, {} {} {} {} {} {}",
+                "SELECT",
+                r#""test_space"."id" as "id""#,
+                r#""test_space"."sysFrom" as "sysFrom""#,
+                r#""test_space"."FIRST_NAME" as "FIRST_NAME""#,
+                r#""test_space"."sys_op" as "sys_op""#,
+                r#""test_space"."bucket_id" as "bucket_id""#,
+                r#"FROM "test_space""#,
+                r#"WHERE (("test_space"."id") in"#,
+                r#"(SELECT COLUMN_2 as "identification_number" FROM (VALUES (2),(3)))"#,
+                r#"or ("test_space"."id") in"#,
+                r#"(SELECT COLUMN_4 as "identification_number" FROM (VALUES (2),(3))))"#,
+            )),
+        ],
+        vec![
+            Value::String(String::from("query send to [3] shard")),
+            Value::String(format!(
+                "{} {}, {}, {}, {}, {} {} {} {} {} {}",
+                "SELECT",
+                r#""test_space"."id" as "id""#,
+                r#""test_space"."sysFrom" as "sysFrom""#,
+                r#""test_space"."FIRST_NAME" as "FIRST_NAME""#,
+                r#""test_space"."sys_op" as "sys_op""#,
+                r#""test_space"."bucket_id" as "bucket_id""#,
+                r#"FROM "test_space""#,
+                r#"WHERE (("test_space"."id") in"#,
+                r#"(SELECT COLUMN_2 as "identification_number" FROM (VALUES (2),(3)))"#,
+                r#"or ("test_space"."id") in"#,
+                r#"(SELECT COLUMN_4 as "identification_number" FROM (VALUES (2),(3))))"#,
+            )),
+        ],
+    ]);
 
     assert_eq!(expected, result)
 }
diff --git a/src/executor/vtable.rs b/src/executor/vtable.rs
index 1f785068bb..45f04eb276 100644
--- a/src/executor/vtable.rs
+++ b/src/executor/vtable.rs
@@ -18,7 +18,7 @@ pub struct VirtualTable {
     /// "Raw" tuples (list of values)
     tuples: Vec<VTableTuple>,
     /// Unique table name (we need to generate it ourselves).
-    name: String,
+    name: Option<String>,
     /// Tuples grouped by the corresponding shards.
     /// Value `HashSet` contains indexes from the `tuples` attribute
     hashing: HashMap<String, HashSet<usize>>,
@@ -26,11 +26,11 @@ pub struct VirtualTable {
 
 impl VirtualTable {
     #[must_use]
-    pub fn new(name: &str) -> Self {
+    pub fn new() -> Self {
         VirtualTable {
             columns: vec![],
             tuples: vec![],
-            name: String::from(name),
+            name: None,
             hashing: HashMap::new(),
         }
     }
@@ -69,6 +69,12 @@ impl VirtualTable {
         self.tuples.clone()
     }
 
+    /// Get vtable columns list
+    #[must_use]
+    pub fn get_columns(&self) -> Vec<Column> {
+        self.columns.clone()
+    }
+
     /// Get tuples was distributed by sharding keys
     #[must_use]
     pub fn get_tuple_distribution(&self) -> HashMap<String, HashSet<usize>> {
@@ -95,6 +101,23 @@ impl VirtualTable {
             };
         }
     }
+
+    /// Set vtable alias name
+    pub fn set_alias(&mut self, name: &str) -> Result<(), QueryPlannerError> {
+        if name.is_empty() {
+            return Err(QueryPlannerError::CustomError(
+                "Can't set empty alias for virtual table".into(),
+            ));
+        }
+
+        self.name = Some(String::from(name));
+        Ok(())
+    }
+
+    /// Get vtable alias name
+    pub fn get_alias(&self) -> Option<String> {
+        self.name.clone()
+    }
 }
 
 #[cfg(test)]
diff --git a/src/executor/vtable/tests.rs b/src/executor/vtable/tests.rs
index 71363ae392..27b67b1d07 100644
--- a/src/executor/vtable/tests.rs
+++ b/src/executor/vtable/tests.rs
@@ -4,7 +4,7 @@ use super::*;
 
 #[test]
 fn virtual_table() {
-    let mut vtable = VirtualTable::new("test");
+    let mut vtable = VirtualTable::new();
 
     vtable.add_column(Column {
         name: "name".into(),
@@ -13,13 +13,15 @@ fn virtual_table() {
 
     vtable.add_values_tuple(vec![Value::number_from_str("1").unwrap()]);
 
+    vtable.set_alias("test").unwrap();
+
     let expected = VirtualTable {
         columns: vec![Column {
             name: "name".into(),
             r#type: Type::Integer,
         }],
         tuples: vec![vec![Value::number_from_str("1").unwrap()]],
-        name: String::from("test"),
+        name: Some(String::from("test")),
         hashing: HashMap::new(),
     };
 
diff --git a/src/frontend/sql/ir.rs b/src/frontend/sql/ir.rs
index 699ddbaeb5..d66bb39845 100644
--- a/src/frontend/sql/ir.rs
+++ b/src/frontend/sql/ir.rs
@@ -1,4 +1,5 @@
 use std::collections::{HashMap, HashSet};
+
 use traversal::DftPost;
 
 use crate::errors::QueryPlannerError;
diff --git a/src/frontend/sql/ir/tests.rs b/src/frontend/sql/ir/tests.rs
index d599141cc5..3889c984b3 100644
--- a/src/frontend/sql/ir/tests.rs
+++ b/src/frontend/sql/ir/tests.rs
@@ -199,7 +199,12 @@ fn inner_join_duplicate_columns() {
     let ast = AbstractSyntaxTree::new(query).unwrap();
     let plan_err = ast.to_ir(metadata).unwrap_err();
 
-    assert_eq!(QueryPlannerError::DuplicateColumn, plan_err);
+    assert_eq!(
+        QueryPlannerError::CustomError(
+            "Row can't be added because `\"sys_op\"` already has an alias".into()
+        ),
+        plan_err
+    );
 }
 
 #[test]
diff --git a/src/ir/expression.rs b/src/ir/expression.rs
index 88a18c2bba..578f669215 100644
--- a/src/ir/expression.rs
+++ b/src/ir/expression.rs
@@ -260,7 +260,10 @@ impl Nodes {
                 .ok_or(QueryPlannerError::InvalidNode)?
             {
                 if !names.insert(String::from(name)) {
-                    return Err(QueryPlannerError::DuplicateColumn);
+                    return Err(QueryPlannerError::CustomError(format!(
+                        "Row can't be added because `{}` already has an alias",
+                        name
+                    )));
                 }
             } else {
                 return Err(QueryPlannerError::InvalidRow);
diff --git a/src/ir/expression/tests.rs b/src/ir/expression/tests.rs
index c41d49d74b..d25f296220 100644
--- a/src/ir/expression/tests.rs
+++ b/src/ir/expression/tests.rs
@@ -1,7 +1,8 @@
+use pretty_assertions::assert_eq;
+
 use crate::ir::relation::*;
 use crate::ir::value::*;
 use crate::ir::*;
-use pretty_assertions::assert_eq;
 
 #[test]
 fn row_duplicate_column_names() {
@@ -12,7 +13,9 @@ fn row_duplicate_column_names() {
     let c2 = plan.nodes.add_const(Value::number_from_str("2").unwrap());
     let c2_alias_a = plan.nodes.add_alias("a", c2).unwrap();
     assert_eq!(
-        QueryPlannerError::DuplicateColumn,
+        QueryPlannerError::CustomError(
+            "Row can't be added because `a` already has an alias".into()
+        ),
         plan.nodes
             .add_row_of_aliases(vec![c1_alias_a, c2_alias_a], None)
             .unwrap_err()
diff --git a/src/ir/operator.rs b/src/ir/operator.rs
index c10c68563b..b952be3ce7 100644
--- a/src/ir/operator.rs
+++ b/src/ir/operator.rs
@@ -277,7 +277,8 @@ impl Relational {
             Relational::ScanSubQuery { alias, .. } => Ok(alias.as_deref()),
             Relational::Projection { .. }
             | Relational::Selection { .. }
-            | Relational::InnerJoin { .. } => {
+            | Relational::InnerJoin { .. }
+            | Relational::Motion { .. } => {
                 let output_row = plan.get_expression_node(self.output())?;
                 let list = output_row.extract_row_list()?;
                 let col_id = *list.get(position).ok_or_else(|| {
@@ -302,7 +303,7 @@ impl Relational {
                 }
                 Ok(None)
             }
-            _ => Ok(None),
+            Relational::UnionAll { .. } => Ok(None),
         }
     }
 
diff --git a/src/ir/operator/tests.rs b/src/ir/operator/tests.rs
index 5a9865d2bd..4b15e57c9d 100644
--- a/src/ir/operator/tests.rs
+++ b/src/ir/operator/tests.rs
@@ -413,7 +413,9 @@ fn join_duplicate_columns() {
         .unwrap();
     let condition = plan.nodes.add_bool(a_row, Bool::Eq, d_row).unwrap();
     assert_eq!(
-        QueryPlannerError::DuplicateColumn,
+        QueryPlannerError::CustomError(
+            "Row can't be added because `a` already has an alias".into()
+        ),
         plan.add_join(scan_t1, scan_t2, condition).unwrap_err()
     );
 }
diff --git a/src/ir/relation.rs b/src/ir/relation.rs
index 0f9bd6afe3..986406ed34 100644
--- a/src/ir/relation.rs
+++ b/src/ir/relation.rs
@@ -154,7 +154,9 @@ impl Table {
             .all(|(pos, col)| matches!(pos_map.insert(&col.name, pos), None));
 
         if !no_duplicates {
-            return Err(QueryPlannerError::DuplicateColumn);
+            return Err(QueryPlannerError::CustomError(
+                "Table has duplicated columns and couldn't be loaded".into(),
+            ));
         }
 
         let keys = &k;
@@ -189,7 +191,9 @@ impl Table {
         let no_duplicates = cols.iter().all(|col| uniq_cols.insert(&col.name));
 
         if !no_duplicates {
-            return Err(QueryPlannerError::DuplicateColumn);
+            return Err(QueryPlannerError::CustomError(
+                "Table contains duplicate cols and couldn't convert to yaml.".into(),
+            ));
         }
 
         let in_range = ts.key.positions.iter().all(|pos| *pos < cols.len());
diff --git a/src/ir/relation/tests.rs b/src/ir/relation/tests.rs
index be57cd71e4..468e51e9f9 100644
--- a/src/ir/relation/tests.rs
+++ b/src/ir/relation/tests.rs
@@ -55,7 +55,9 @@ fn table_seg_duplicate_columns() {
             &["b", "a"],
         )
         .unwrap_err(),
-        QueryPlannerError::DuplicateColumn
+        QueryPlannerError::CustomError(
+            "Table has duplicated columns and couldn't be loaded".into()
+        )
     );
 }
 
@@ -114,7 +116,9 @@ fn table_seg_serialized_duplicate_columns() {
     let s = fs::read_to_string(path).unwrap();
     assert_eq!(
         Table::seg_from_yaml(&s).unwrap_err(),
-        QueryPlannerError::DuplicateColumn
+        QueryPlannerError::CustomError(
+            "Table contains duplicate cols and couldn't convert to yaml.".into()
+        )
     );
 }
 
diff --git a/test_app/test/integration/api_test.lua b/test_app/test/integration/api_test.lua
index 1d9e92d692..b8222a80f9 100644
--- a/test_app/test/integration/api_test.lua
+++ b/test_app/test/integration/api_test.lua
@@ -1,4 +1,5 @@
 local t = require('luatest')
+local fiber = require('fiber')
 local g = t.group('integration_api')
 
 local helper = require('test.helper')
@@ -225,3 +226,62 @@ g.test_null_col_result = function()
         },
     })
 end
+
+g.test_join_motion_query = function()
+    local api = cluster:server("api-1").net_box
+
+    local r, err = api:call("query", { [[SELECT "t3"."id", "t3"."name", "t8"."product_units"
+FROM
+    (SELECT "id", "name"
+        FROM "space_simple_shard_key"
+        WHERE "sysOp" > 0
+    UNION ALL
+        SELECT "id", "name"
+        FROM "space_simple_shard_key_hist"
+        WHERE "sysOp" > 0) AS "t3"
+INNER JOIN
+    (SELECT "id" as "id1", "product_units"
+    FROM "testing_space"
+    WHERE "product_units" < 0
+    UNION ALL
+    SELECT "id" as "id1", "product_units"
+    FROM "testing_space_hist"
+    WHERE "product_units" > 0) AS "t8"
+    ON "t3"."id" = "t8"."id1"
+WHERE "t3"."id" = 1]] })
+
+    t.assert_equals(err, nil)
+    t.assert_equals(r, {
+        metadata = {
+            {name = "id", type = "integer"},
+            {name = "name", type = "string"},
+            {name = "product_units", type = "integer"},
+        },
+        rows = {
+            { 1, "ok", 5 },
+            { 1, "ok_hist", 5 }
+        },
+    })
+end
+
+g.test_anonymous_cols_naming = function()
+    local api = cluster:server("api-1").net_box
+
+    local r, err = api:call("query", { [[SELECT * FROM "testing_space"
+    WHERE "id" in (SELECT "id" FROM "space_simple_shard_key_hist" WHERE "sysOp" > 0)
+        OR "id" in (SELECT "id" FROM "space_simple_shard_key_hist" WHERE "sysOp" > 0)
+    ]] })
+
+    t.assert_equals(err, nil)
+    t.assert_equals(r, {
+        metadata = {
+            {name = "id", type = "integer"},
+            {name = "name", type = "string"},
+            {name = "product_units", type = "integer"},
+            {name = "bucket_id", type = "unsigned"},
+        },
+        rows = {
+            {1, "123", 1, 360}
+        },
+    })
+end
\ No newline at end of file
-- 
GitLab