diff --git a/src/executor.rs b/src/executor.rs index f7bf0d0b1f7080c29c210b9790f92344880e00a4..4c2204991463da104ea01124736e63fcf46a10a5 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -1,86 +1,98 @@ use crate::errors::QueryPlannerError; +use crate::executor::bucket::Buckets; use crate::executor::engine::Engine; pub use crate::executor::engine::Metadata; use crate::executor::ir::ExecutionPlan; use crate::executor::result::BoxExecuteFormat; use crate::frontend::sql::ast::AbstractSyntaxTree; use crate::ir::Plan; +use std::collections::HashMap; +mod bucket; pub mod engine; pub(crate) mod ir; mod result; -mod shard; mod vtable; +impl Plan { + /// Apply optimization rules to the plan. + fn optimize(&mut self) -> Result<(), QueryPlannerError> { + self.replace_in_operator()?; + self.split_columns()?; + self.set_dnf()?; + // TODO: make it a plan method and rename to "derive_equalities()". + self.nodes.add_new_equalities()?; + self.merge_tuples()?; + self.add_motions()?; + Ok(()) + } +} + /// Query object for executing -#[allow(dead_code)] pub struct Query<T> where T: Engine, { - /// Query IR - plan: Plan, - /// Execute engine object + /// Execution plan + exec_plan: ExecutionPlan, + /// Execution engine engine: T, + /// Bucket map + bucket_map: HashMap<usize, Buckets>, } -#[allow(dead_code)] impl<T> Query<T> where T: Engine, { - /// Create query object + /// Create a new query. /// /// # Errors - /// - query isn't valid or not support yet. + /// - Failed to parse SQL. + /// - Failed to build AST. + /// - Failed to build IR plan. + /// - Failed to apply optimizing transformations to IR plan. pub fn new(engine: T, sql: &str) -> Result<Self, QueryPlannerError> where T::Metadata: Metadata, { let ast = AbstractSyntaxTree::new(sql)?; - Ok(Query { - plan: ast.to_ir(&engine.metadata())?, + let mut plan = ast.to_ir(engine.metadata())?; + plan.optimize()?; + let query = Query { + exec_plan: ExecutionPlan::from(plan), engine, - }) + bucket_map: HashMap::new(), + }; + Ok(query) } - /// Execute query in cluster + /// Execute distributed query. /// /// # Errors - /// - query can't be executed in cluster - /// - invalid bucket id - pub fn exec(&self) -> Result<BoxExecuteFormat, QueryPlannerError> { - let mut exec_plan = ExecutionPlan::from(&self.plan); - - let ir_plan = exec_plan.get_ir_plan(); - if let Some(slices) = ir_plan.get_slices() { - for motion_level in slices { - for motion_id in motion_level { + /// - Failed to get a motion subtree. + /// - Failed to discover buckets. + /// - Failed to materialize motion result and build a virtual table. + /// - Failed to get plan top. + pub fn exec(&mut self) -> Result<BoxExecuteFormat, QueryPlannerError> { + let slices = self.exec_plan.get_ir_plan().get_slices(); + if let Some(slices) = slices { + for slice in slices { + for motion_id in slice { // TODO: make it work in parallel - let vtable = self.engine.materialize_motion(&mut exec_plan, motion_id)?; - exec_plan.add_motion_result(motion_id, vtable)?; + let top_id = self.exec_plan.get_motion_subtree_root(motion_id)?; + let buckets = self.bucket_discovery(top_id)?; + let virtual_table = + self.engine + .materialize_motion(&mut self.exec_plan, motion_id, &buckets)?; + self.exec_plan.add_motion_result(motion_id, virtual_table)?; } } } - let top = exec_plan.get_ir_plan().get_top()?; - - self.engine.exec(&mut exec_plan, top) - } - - /// Apply optimize rules - /// - /// # Errors - /// - transformation can't be applied - pub fn optimize(&mut self) -> Result<(), QueryPlannerError> { - self.plan.replace_in_operator()?; - self.plan.split_columns()?; - self.plan.set_dnf()?; - // TODO: make it a plan method and rename to "derive_equalities()". - self.plan.nodes.add_new_equalities()?; - self.plan.merge_tuples()?; - self.plan.add_motions()?; - Ok(()) + let top_id = self.exec_plan.get_ir_plan().get_top()?; + let buckets = self.bucket_discovery(top_id)?; + self.engine.exec(&mut self.exec_plan, top_id, &buckets) } } diff --git a/src/executor/bucket.rs b/src/executor/bucket.rs new file mode 100644 index 0000000000000000000000000000000000000000..1aa9d752db2de6cb8416593def82a5a28dab968d --- /dev/null +++ b/src/executor/bucket.rs @@ -0,0 +1,372 @@ +use crate::errors::QueryPlannerError; +use crate::executor::engine::Engine; +use crate::executor::Query; +use crate::ir::distribution::Distribution; +use crate::ir::expression::Expression; +use crate::ir::operator::{Bool, Relational}; +use crate::ir::transformation::redistribution::MotionPolicy; +use std::collections::HashSet; +use traversal::DftPost; + +/// Buckets are used to determine which nodes to send the query to. +#[derive(Clone, Debug, PartialEq)] +pub enum Buckets { + // We don't want to keep thousands of buckets in memory + // so we use a special enum to represent all the buckets + // in a cluster. + All, + // A filtered set of buckets. + Filtered(HashSet<u64>), +} + +impl Buckets { + /// Get all buckets in the cluster. + pub fn new_all() -> Self { + Buckets::All + } + + /// Get a filtered set of buckets. + pub fn new_filtered(buckets: HashSet<u64>) -> Self { + Buckets::Filtered(buckets) + } + + /// Disjunction of two sets of buckets. + pub fn disjunct(&self, buckets: &Buckets) -> Buckets { + match (self, buckets) { + (Buckets::All, Buckets::All) => Buckets::All, + (Buckets::Filtered(b), Buckets::All) | (Buckets::All, Buckets::Filtered(b)) => { + Buckets::Filtered(b.clone()) + } + (Buckets::Filtered(a), Buckets::Filtered(b)) => { + Buckets::Filtered(a.intersection(b).copied().collect()) + } + } + } + + /// Conjunction of two sets of buckets. + pub fn conjunct(&self, buckets: &Buckets) -> Buckets { + match (self, buckets) { + (Buckets::All, _) | (_, Buckets::All) => Buckets::All, + (Buckets::Filtered(a), Buckets::Filtered(b)) => { + Buckets::Filtered(a.union(b).copied().collect()) + } + } + } +} + +impl<T> Query<T> +where + T: Engine, +{ + fn get_buckets_from_expr(&self, expr_id: usize) -> Result<Buckets, QueryPlannerError> { + let mut buckets: Vec<Buckets> = Vec::new(); + let ir_plan = self.exec_plan.get_ir_plan(); + let expr = ir_plan.get_expression_node(expr_id)?; + if let Expression::Bool { + op: Bool::Eq | Bool::In, + left, + right, + .. + } = expr + { + let pairs = vec![(*left, *right), (*right, *left)]; + for (left_id, right_id) in pairs { + let left_expr = ir_plan.get_expression_node(left_id)?; + if !left_expr.is_row() { + return Err(QueryPlannerError::CustomError(format!( + "Left side of equality expression is not a row: {:?}", + left_expr + ))); + } + let right_expr = ir_plan.get_expression_node(right_id)?; + let right_columns = if let Expression::Row { list, .. } = right_expr { + list.clone() + } else { + return Err(QueryPlannerError::CustomError(format!( + "Right side of equality expression is not a row: {:?}", + right_expr + ))); + }; + + // Get the distribution of the left row. + let left_dist = ir_plan.get_distribution(left_id)?; + + // Gather buckets from the right row. + if let Distribution::Segment { keys } = left_dist { + // If the right side is a row referencing to the motion + // it means that the corresponding virtual table contains + // tuple with the same distribution as the left side. + if let Some(motion_id) = ir_plan.get_motion_from_row(right_id)? { + let virtual_table = self.exec_plan.get_motion_vtable(motion_id)?; + let hashed_keys = virtual_table.get_tuple_distribution().keys(); + let mut bucket_ids: HashSet<u64> = HashSet::new(); + for bucket_str in hashed_keys { + bucket_ids.insert(self.engine.determine_bucket_id(bucket_str)); + } + if !bucket_ids.is_empty() { + buckets.push(Buckets::new_filtered(bucket_ids)); + } + } + + // The right side is a regular row with constants + // on the positions of the left keys (if we are lucky). + for key in keys { + let mut values: Vec<String> = Vec::new(); + for position in &key.positions { + let right_column_id = + *right_columns.get(*position).ok_or_else(|| { + QueryPlannerError::CustomError(format!( + "Right row does not have column at position {}", + position + )) + })?; + let right_column_expr = ir_plan.get_expression_node(right_column_id)?; + if let Expression::Constant { .. } = right_column_expr { + values.push(right_column_expr.get_const_value()?.into()); + } else { + // One of the columns is not a constant. Skip this key. + values = Vec::new(); + break; + } + } + if !values.is_empty() { + let bucket_str = values.join(""); + let bucket = self.engine.determine_bucket_id(&bucket_str); + buckets.push(Buckets::new_filtered([bucket].into())); + } + } + } + } + } + + if buckets.is_empty() { + Ok(Buckets::new_all()) + } else { + Ok(buckets + .into_iter() + .fold(Buckets::new_all(), |a, b| a.disjunct(&b))) + } + } + + fn get_expression_tree_buckets(&self, expr_id: usize) -> Result<Buckets, QueryPlannerError> { + let ir_plan = self.exec_plan.get_ir_plan(); + let chains = ir_plan.get_dnf_chains(expr_id)?; + let mut result: Vec<Buckets> = Vec::new(); + for mut chain in chains { + let mut chain_buckets = Buckets::new_all(); + let nodes = chain.get_mut_nodes(); + // Nodes in the chain are in the top-down order (from left tot right). + // We need to pop back the chain to get nodes in the bottom-up order. + while let Some(node_id) = nodes.pop_back() { + let node_buckets = self.get_buckets_from_expr(node_id)?; + chain_buckets = chain_buckets.disjunct(&node_buckets); + } + result.push(chain_buckets); + } + + if let Some((first, other)) = result.split_first_mut() { + for buckets in other { + *first = first.conjunct(buckets); + } + return Ok(first.clone()); + } + + Ok(Buckets::All) + } + + /// Discover required buckets to execute the query subtree. + /// + /// # Errors + /// - Relational iterator returns non-relational nodes. + /// - Failed to find a virtual table. + /// - Relational nodes contain invalid children. + #[allow(clippy::too_many_lines)] + pub fn bucket_discovery(&mut self, top_id: usize) -> Result<Buckets, QueryPlannerError> { + let mut nodes: Vec<usize> = Vec::new(); + let ir_plan = self.exec_plan.get_ir_plan(); + let rel_tree = DftPost::new(&top_id, |node| ir_plan.nodes.rel_iter(node)); + for (_, node_id) in rel_tree { + nodes.push(*node_id); + } + + for node_id in nodes { + if self.bucket_map.get(&node_id).is_some() { + continue; + } + + let rel = self.exec_plan.get_ir_plan().get_relation_node(node_id)?; + match rel { + Relational::ScanRelation { output, .. } => { + self.bucket_map.insert(*output, Buckets::new_all()); + } + Relational::Motion { policy, output, .. } => match policy { + MotionPolicy::Full => { + self.bucket_map.insert(*output, Buckets::new_all()); + } + MotionPolicy::Segment(_) => { + let virtual_table = self.exec_plan.get_motion_vtable(node_id)?; + let mut buckets: HashSet<u64> = HashSet::new(); + for key in virtual_table.get_tuple_distribution().keys() { + let bucket = self.engine.determine_bucket_id(key); + buckets.insert(bucket); + } + self.bucket_map + .insert(*output, Buckets::new_filtered(buckets)); + } + MotionPolicy::Local => { + return Err(QueryPlannerError::CustomError( + "Local motion policy should never appear in the plan".to_string(), + )); + } + }, + Relational::Projection { + children, output, .. + } + | Relational::ScanSubQuery { + children, output, .. + } => { + let child_id = children.first().ok_or_else(|| { + QueryPlannerError::CustomError( + "Current node should have exactly one child".to_string(), + ) + })?; + let child_rel = self.exec_plan.get_ir_plan().get_relation_node(*child_id)?; + let child_buckets = self + .bucket_map + .get(&child_rel.output()) + .ok_or_else(|| { + QueryPlannerError::CustomError( + "Failed to retrieve buckets of the child from the bucket map." + .to_string(), + ) + })? + .clone(); + self.bucket_map.insert(*output, child_buckets); + } + Relational::UnionAll { + children, output, .. + } => { + if let (Some(first_id), Some(second_id), None) = + (children.first(), children.get(1), children.get(2)) + { + let first_rel = + self.exec_plan.get_ir_plan().get_relation_node(*first_id)?; + let second_rel = + self.exec_plan.get_ir_plan().get_relation_node(*second_id)?; + let first_buckets = self.bucket_map.get(&first_rel.output()).ok_or_else(|| { + QueryPlannerError::CustomError( + "Failed to retrieve buckets of the first union all child from the bucket map." + .to_string(), + ) + })?; + let second_buckets = self.bucket_map.get(&second_rel.output()).ok_or_else(|| { + QueryPlannerError::CustomError( + "Failed to retrieve buckets of the second union all child from the bucket map." + .to_string(), + ) + })?; + let buckets = first_buckets.disjunct(second_buckets); + self.bucket_map.insert(*output, buckets); + } else { + return Err(QueryPlannerError::CustomError( + "Current node should have exactly two children".to_string(), + )); + } + } + Relational::Selection { + children, + filter, + output, + .. + } => { + // We need to get the buckets of the child node for the case + // when the filter returns no buckets to reduce. + let child_id = children.first().ok_or_else(|| { + QueryPlannerError::CustomError( + "Current node should have exactly one child".to_string(), + ) + })?; + let child_rel = self.exec_plan.get_ir_plan().get_relation_node(*child_id)?; + let child_buckets = self + .bucket_map + .get(&child_rel.output()) + .ok_or_else(|| { + QueryPlannerError::CustomError( + "Failed to retrieve buckets of the selection child from the bucket map." + .to_string(), + ) + })? + .clone(); + let output_id = *output; + let filter_id = *filter; + let filter_buckets = self.get_expression_tree_buckets(filter_id)?; + self.bucket_map + .insert(output_id, child_buckets.disjunct(&filter_buckets)); + } + Relational::InnerJoin { + children, + condition, + output, + .. + } => { + if let (Some(inner_id), Some(outer_id)) = (children.first(), children.get(1)) { + let inner_rel = + self.exec_plan.get_ir_plan().get_relation_node(*inner_id)?; + let outer_rel = + self.exec_plan.get_ir_plan().get_relation_node(*outer_id)?; + let inner_buckets = self + .bucket_map + .get(&inner_rel.output()) + .ok_or_else(|| { + QueryPlannerError::CustomError( + "Failed to retrieve buckets of the inner child from the bucket map." + .to_string(), + ) + })? + .clone(); + let outer_buckets = self + .bucket_map + .get(&outer_rel.output()) + .ok_or_else(|| { + QueryPlannerError::CustomError( + "Failed to retrieve buckets of the outer child from the bucket map." + .to_string(), + ) + })? + .clone(); + let output_id = *output; + let condition_id = *condition; + let filter_buckets = self.get_expression_tree_buckets(condition_id)?; + self.bucket_map.insert( + output_id, + inner_buckets + .conjunct(&outer_buckets) + .disjunct(&filter_buckets), + ); + } else { + return Err(QueryPlannerError::CustomError( + "Current node should have at least two children".to_string(), + )); + } + } + } + } + + let top_rel = self.exec_plan.get_ir_plan().get_relation_node(top_id)?; + let top_buckets = self + .bucket_map + .get(&top_rel.output()) + .ok_or_else(|| { + QueryPlannerError::CustomError( + "Failed to retrieve buckets of the top relation from the bucket map." + .to_string(), + ) + })? + .clone(); + + Ok(top_buckets) + } +} + +#[cfg(test)] +mod tests; diff --git a/src/executor/bucket/tests.rs b/src/executor/bucket/tests.rs new file mode 100644 index 0000000000000000000000000000000000000000..dc8e0fb01cdf1711e2bc903c820d59c47957459c --- /dev/null +++ b/src/executor/bucket/tests.rs @@ -0,0 +1,118 @@ +use pretty_assertions::assert_eq; + +use crate::executor::bucket::Buckets; +use crate::executor::engine::mock::EngineMock; +use crate::executor::engine::Engine; +use crate::executor::Query; + +#[test] +fn simple_union_query() { + let query = r#"SELECT * FROM ( + SELECT * FROM "test_space" WHERE "sysFrom" > 0 + UNION ALL + SELECT * FROM "test_space_hist" WHERE "sysFrom" < 0 + ) as "t3" + WHERE "id" = 1"#; + + let engine = EngineMock::new(); + let mut query = Query::new(engine, query).unwrap(); + let plan = query.exec_plan.get_ir_plan(); + let top = plan.get_top().unwrap(); + let buckets = query.bucket_discovery(top).unwrap(); + + let bucket1 = query.engine.determine_bucket_id("1"); + let expected = Buckets::new_filtered([bucket1].into()); + + assert_eq!(expected, buckets); +} + +#[test] +fn simple_disjunction_in_union_query() { + let query = r#"SELECT * FROM ( + SELECT * FROM "test_space" WHERE "sysFrom" > 0 + UNION ALL + SELECT * FROM "test_space_hist" WHERE "sysFrom" < 0 + ) as "t3" + WHERE ("id" = 1) OR ("id" = 100)"#; + + let engine = EngineMock::new(); + let mut query = Query::new(engine, query).unwrap(); + let plan = query.exec_plan.get_ir_plan(); + let top = plan.get_top().unwrap(); + let buckets = query.bucket_discovery(top).unwrap(); + + let bucket1 = query.engine.determine_bucket_id("1"); + let bucket100 = query.engine.determine_bucket_id("100"); + let expected = Buckets::new_filtered([bucket1, bucket100].into()); + + assert_eq!(expected, buckets); +} + +#[test] +fn complex_shard_key_union_query() { + let query = r#"SELECT * + FROM + (SELECT "identification_number", "product_code" + FROM "hash_testing" + WHERE "sys_op" = 1 + UNION ALL + SELECT "identification_number", "product_code" + FROM "hash_testing_hist" + WHERE "sys_op" > 1) AS "t3" + WHERE "identification_number" = 1 AND "product_code" = '222'"#; + + let engine = EngineMock::new(); + let mut query = Query::new(engine, query).unwrap(); + let plan = query.exec_plan.get_ir_plan(); + let top = plan.get_top().unwrap(); + let buckets = query.bucket_discovery(top).unwrap(); + + let bucket = query.engine.determine_bucket_id(&["1", "222"].join("")); + let expected = Buckets::new_filtered([bucket].into()); + + assert_eq!(expected, buckets); +} + +#[test] +fn union_complex_cond_query() { + let query = r#"SELECT * + FROM + (SELECT "identification_number", "product_code" + FROM "hash_testing" + WHERE "sys_op" = 1 + UNION ALL + SELECT "identification_number", "product_code" + FROM "hash_testing_hist" + WHERE "sys_op" > 1) AS "t3" + WHERE ("identification_number" = 1 + OR ("identification_number" = 100 + OR "identification_number" = 1000)) + AND ("product_code" = '222' + OR "product_code" = '111')"#; + + let engine = EngineMock::new(); + let mut query = Query::new(engine, query).unwrap(); + let plan = query.exec_plan.get_ir_plan(); + let top = plan.get_top().unwrap(); + let buckets = query.bucket_discovery(top).unwrap(); + + let bucket1222 = query.engine.determine_bucket_id(&["1", "222"].join("")); + let bucket100222 = query.engine.determine_bucket_id(&["100", "222"].join("")); + let bucket1000222 = query.engine.determine_bucket_id(&["1000", "222"].join("")); + let bucket1111 = query.engine.determine_bucket_id(&["1", "111"].join("")); + let bucket100111 = query.engine.determine_bucket_id(&["100", "111"].join("")); + let bucket1000111 = query.engine.determine_bucket_id(&["1000", "111"].join("")); + let expected = Buckets::new_filtered( + [ + bucket1222, + bucket100222, + bucket1000222, + bucket1111, + bucket100111, + bucket1000111, + ] + .into(), + ); + + assert_eq!(expected, buckets); +} diff --git a/src/executor/engine.rs b/src/executor/engine.rs index 670be1c336c6a097de7db95942d7745e08417974..9b554bdee88aef2695be1b40c5429b0282ed72cf 100644 --- a/src/executor/engine.rs +++ b/src/executor/engine.rs @@ -1,4 +1,5 @@ use crate::errors::QueryPlannerError; +use crate::executor::bucket::Buckets; use crate::executor::ir::ExecutionPlan; use crate::executor::result::BoxExecuteFormat; use crate::executor::vtable::VirtualTable; @@ -28,7 +29,7 @@ pub trait Engine { type Metadata; /// Return object of metadata storage - fn metadata(&self) -> Self::Metadata + fn metadata(&self) -> &Self::Metadata where Self: Sized; @@ -49,6 +50,7 @@ pub trait Engine { &self, plan: &mut ExecutionPlan, motion_node_id: usize, + buckets: &Buckets, ) -> Result<VirtualTable, QueryPlannerError>; /// Execute sql query on the all shards in cluster @@ -59,10 +61,11 @@ pub trait Engine { &self, plan: &mut ExecutionPlan, top_id: usize, + buckets: &Buckets, ) -> Result<BoxExecuteFormat, QueryPlannerError>; /// Determine shard for query execution by sharding key value - fn determine_bucket_id(&self, s: &str) -> usize; + fn determine_bucket_id(&self, s: &str) -> u64; } #[cfg(test)] diff --git a/src/executor/engine/cartridge.rs b/src/executor/engine/cartridge.rs index 2d2b5fe2333d08c81e8a0b5df265a418507d665a..b46b4bf1827a5906ae43b9540fe5ba1865161b35 100644 --- a/src/executor/engine/cartridge.rs +++ b/src/executor/engine/cartridge.rs @@ -4,16 +4,17 @@ use tarantool::log::{say, SayLevel}; use tarantool::tlua::LuaFunction; use crate::errors::QueryPlannerError; -use crate::executor::engine::cartridge::bucket::str_to_bucket_id; +use crate::executor::bucket::Buckets; use crate::executor::engine::cartridge::cache::ClusterSchema; +use crate::executor::engine::cartridge::hash::str_to_bucket_id; use crate::executor::engine::Engine; use crate::executor::ir::ExecutionPlan; use crate::executor::result::BoxExecuteFormat; use crate::executor::vtable::VirtualTable; mod backend; -pub mod bucket; pub mod cache; +pub mod hash; #[derive(Debug, Clone)] pub struct Runtime { @@ -25,8 +26,8 @@ pub struct Runtime { impl Engine for Runtime { type Metadata = ClusterSchema; - fn metadata(&self) -> Self::Metadata { - self.metadata.clone() + fn metadata(&self) -> &Self::Metadata { + &self.metadata } fn has_metadata(&self) -> bool { @@ -64,42 +65,36 @@ impl Engine for Runtime { &self, plan: &mut ExecutionPlan, top_id: usize, + buckets: &Buckets, ) -> Result<BoxExecuteFormat, QueryPlannerError> { let mut result = BoxExecuteFormat::new(); let sql = plan.subtree_as_sql(top_id)?; - say( - SayLevel::Debug, - file!(), - line!().try_into().unwrap_or(0), - Option::from("exec"), - &format!("exec query: {:?}", sql), - ); - - if let Some(shard_keys) = plan.discovery(top_id)? { - // sending query to nodes - for shard in shard_keys { - // exec query on node - let temp_result = self.exec_query(&shard, &sql)?; - result.extend(temp_result)?; + match buckets { + Buckets::All => { + result.extend(cluster_exec_query(&sql)?)?; + } + Buckets::Filtered(list) => { + for bucket in list { + let temp_result = bucket_exec_query(*bucket, &sql)?; + result.extend(temp_result)?; + } } - } else { - let temp_result = mp_exec_query(&sql)?; - - result.extend(temp_result)?; } + Ok(result) } - /// Transform sub query result to virtual table + /// Transform sub query results into a virtual table. fn materialize_motion( &self, plan: &mut ExecutionPlan, motion_node_id: usize, + buckets: &Buckets, ) -> Result<VirtualTable, QueryPlannerError> { let top = &plan.get_motion_subtree_root(motion_node_id)?; - let result = self.exec(plan, *top)?; + let result = self.exec(plan, *top, buckets)?; let mut vtable = result.as_virtual_table()?; if let Some(name) = &plan.get_motion_alias(motion_node_id)? { @@ -109,8 +104,8 @@ impl Engine for Runtime { Ok(vtable) } - /// Calculation ``bucket_id`` function - fn determine_bucket_id(&self, s: &str) -> usize { + /// Calculate bucket for a key. + fn determine_bucket_id(&self, s: &str) -> u64 { str_to_bucket_id(s, self.bucket_count) } } @@ -174,93 +169,79 @@ impl Runtime { Ok(()) } +} - /// Function execute sql query on selected node - fn exec_query( - &self, - shard_key: &str, - query: &str, - ) -> Result<BoxExecuteFormat, QueryPlannerError> { - let cast_bucket_id: u64 = match self.determine_bucket_id(shard_key).try_into() { - Ok(v) => v, - Err(_) => { - return Err(QueryPlannerError::CustomError("Invalid bucket id".into())); - } - }; - - say( - SayLevel::Debug, - file!(), - line!().try_into().unwrap_or(0), - None, - &format!( - "distribution keys is {:?} bucket {:?}", - shard_key, cast_bucket_id - ), - ); - - let lua = tarantool::lua_state(); - match lua.exec( - r#" - local vshard = require('vshard') - local yaml = require('yaml') - - function execute_sql(bucket_id, query) - local res, err = vshard.router.call( - bucket_id, - 'read', - 'box.execute', - { query } - ) - - if err ~= nil then - error(err) - end +/// Send the query to a single bucket and merge results (map-reduce). +fn bucket_exec_query(bucket: u64, query: &str) -> Result<BoxExecuteFormat, QueryPlannerError> { + say( + SayLevel::Debug, + file!(), + line!().try_into().unwrap_or(0), + None, + &format!("Execute a query {:?} on bucket {:?}", query, bucket), + ); - return res + let lua = tarantool::lua_state(); + match lua.exec( + r#" + local vshard = require('vshard') + local yaml = require('yaml') + + function execute_sql(bucket_id, query) + local res, err = vshard.router.call( + bucket_id, + 'read', + 'box.execute', + { query } + ) + + if err ~= nil then + error(err) end - "#, - ) { - Ok(_) => {} - Err(e) => { - say( - SayLevel::Error, - file!(), - line!().try_into().unwrap_or(0), - Option::from("exec_query"), - &format!("{:?}", e), - ); - return Err(QueryPlannerError::LuaError(format!( - "Failed lua code loading: {:?}", - e - ))); - } + + return res + end +"#, + ) { + Ok(_) => {} + Err(e) => { + say( + SayLevel::Error, + file!(), + line!().try_into().unwrap_or(0), + Option::from("exec_query"), + &format!("{:?}", e), + ); + return Err(QueryPlannerError::LuaError(format!( + "Failed lua code loading: {:?}", + e + ))); } + } - let exec_sql: LuaFunction<_> = lua.get("execute_sql").ok_or_else(|| { - QueryPlannerError::LuaError("Lua function `execute_sql` not found".into()) - })?; + let exec_sql: LuaFunction<_> = lua.get("execute_sql").ok_or_else(|| { + QueryPlannerError::LuaError("Lua function `execute_sql` not found".into()) + })?; - let res: BoxExecuteFormat = match exec_sql.call_with_args((cast_bucket_id, query)) { - Ok(v) => v, - Err(e) => { - say( - SayLevel::Error, - file!(), - line!().try_into().unwrap_or(0), - Option::from("exec_query"), - &format!("{:?}", e), - ); - return Err(QueryPlannerError::LuaError(format!("Lua error: {:?}", e))); - } - }; + let res: BoxExecuteFormat = match exec_sql.call_with_args((bucket, query)) { + Ok(v) => v, + Err(e) => { + say( + SayLevel::Error, + file!(), + line!().try_into().unwrap_or(0), + Option::from("exec_query"), + &format!("{:?}", e), + ); + return Err(QueryPlannerError::LuaError(format!("Lua error: {:?}", e))); + } + }; - Ok(res) - } + Ok(res) } -/// Sends query to all instances and merges results after (map-reduce). -fn mp_exec_query(query: &str) -> Result<BoxExecuteFormat, QueryPlannerError> { +/// Send the query to all instances and merge results (map-reduce). +fn cluster_exec_query(query: &str) -> Result<BoxExecuteFormat, QueryPlannerError> { say( SayLevel::Debug, file!(), @@ -316,7 +297,7 @@ fn mp_exec_query(query: &str) -> Result<BoxExecuteFormat, QueryPlannerError> { SayLevel::Error, file!(), line!().try_into().unwrap_or(0), - Option::from("mp_exec_query"), + Option::from("cluster_exec_query"), &format!("{:?}", e), ); return Err(QueryPlannerError::LuaError(format!( @@ -337,7 +318,7 @@ fn mp_exec_query(query: &str) -> Result<BoxExecuteFormat, QueryPlannerError> { SayLevel::Error, file!(), line!().try_into().unwrap_or(0), - Option::from("mp_exec_query"), + Option::from("cluster_exec_query"), &format!("{:?}", e), ); return Err(QueryPlannerError::LuaError(format!("Lua error: {:?}", e))); diff --git a/src/executor/engine/cartridge/backend/sql/ir.rs b/src/executor/engine/cartridge/backend/sql/ir.rs index f2cec5735b305a7621c957ab36e15e936f92512b..ed0e64179177b3c02e00179b98fa2ae14b8609b6 100644 --- a/src/executor/engine/cartridge/backend/sql/ir.rs +++ b/src/executor/engine/cartridge/backend/sql/ir.rs @@ -3,12 +3,12 @@ use itertools::Itertools; use crate::errors::QueryPlannerError; use crate::executor::ir::ExecutionPlan; use crate::ir::expression::Expression; -use crate::ir::Node; use crate::ir::operator::Relational; +use crate::ir::Node; use super::tree::{SyntaxData, SyntaxPlan}; -impl<'e> ExecutionPlan<'e> { +impl ExecutionPlan { /// Traverse plan sub-tree (pointed by top) in the order /// convenient for SQL serialization. /// diff --git a/src/executor/engine/cartridge/backend/sql/ir/tests.rs b/src/executor/engine/cartridge/backend/sql/ir/tests.rs index 72c0a8635d8830cf4a305a78e2c9721ff897d9f8..0623ede977a9add98fbca01d2fcbb4864d4a6699 100644 --- a/src/executor/engine/cartridge/backend/sql/ir/tests.rs +++ b/src/executor/engine/cartridge/backend/sql/ir/tests.rs @@ -13,9 +13,9 @@ fn one_table_projection() { let metadata = &MetadataMock::new(); let ast = AbstractSyntaxTree::new(query).unwrap(); let plan = ast.to_ir(metadata).unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( @@ -38,9 +38,9 @@ fn one_table_with_asterisk() { let metadata = &MetadataMock::new(); let ast = AbstractSyntaxTree::new(query).unwrap(); let plan = ast.to_ir(metadata).unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( @@ -69,9 +69,9 @@ fn union_all() { let metadata = &MetadataMock::new(); let ast = AbstractSyntaxTree::new(query).unwrap(); let plan = ast.to_ir(metadata).unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( @@ -96,9 +96,9 @@ fn from_sub_query() { let metadata = &MetadataMock::new(); let ast = AbstractSyntaxTree::new(query).unwrap(); let plan = ast.to_ir(metadata).unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( @@ -128,9 +128,9 @@ fn from_sub_query_with_union() { let metadata = &MetadataMock::new(); let ast = AbstractSyntaxTree::new(query).unwrap(); let plan = ast.to_ir(metadata).unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( @@ -155,9 +155,9 @@ fn inner_join() { let metadata = &MetadataMock::new(); let ast = AbstractSyntaxTree::new(query).unwrap(); let plan = ast.to_ir(metadata).unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( @@ -182,9 +182,9 @@ fn inner_join_with_sq() { let metadata = &MetadataMock::new(); let ast = AbstractSyntaxTree::new(query).unwrap(); let plan = ast.to_ir(metadata).unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( @@ -209,8 +209,8 @@ fn selection_with_sq() { let metadata = &MetadataMock::new(); let ast = AbstractSyntaxTree::new(query).unwrap(); let plan = ast.to_ir(metadata).unwrap(); - let ex_plan = ExecutionPlan::from(&plan); - let top_id = plan.get_top().unwrap(); + let ex_plan = ExecutionPlan::from(plan); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( diff --git a/src/executor/engine/cartridge/backend/sql/tree.rs b/src/executor/engine/cartridge/backend/sql/tree.rs index 34fdfab598f85e5cab36a123b92448fac8f1af71..8a8d665929df1e5650c2d5a55f25208a0f2e244e 100644 --- a/src/executor/engine/cartridge/backend/sql/tree.rs +++ b/src/executor/engine/cartridge/backend/sql/tree.rs @@ -380,17 +380,14 @@ impl Select { /// A wrapper over original plan tree. /// We can modify it as we wish without any influence /// on the original plan tree. -pub struct SyntaxPlan<'p, 'e> { +pub struct SyntaxPlan<'p> { pub(crate) nodes: SyntaxNodes, top: Option<usize>, - plan: &'p ExecutionPlan<'e>, + plan: &'p ExecutionPlan, } #[allow(dead_code)] -impl<'p, 'e> SyntaxPlan<'p, 'e> -where - 'e: 'p, -{ +impl<'p> SyntaxPlan<'p> { #[allow(clippy::too_many_lines)] pub fn add_plan_node(&mut self, id: usize) -> Result<usize, QueryPlannerError> { let ir_plan = self.plan.get_ir_plan(); @@ -528,19 +525,17 @@ where if let Some(motion_id) = ir_plan.get_motion_from_row(id)? { // Replace motion node to virtual table node let vtable = self.plan.get_motion_vtable(motion_id)?; - if vtable.get_alias().is_none() { - let sn = SyntaxNode::new_pointer( - id, - None, - &[ - self.nodes.push_syntax_node(SyntaxNode::new_open()), - self.nodes.push_syntax_node(SyntaxNode::new_vtable(vtable)), - self.nodes.push_syntax_node(SyntaxNode::new_close()), - ], - ); + let sn = SyntaxNode::new_pointer( + id, + None, + &[ + self.nodes.push_syntax_node(SyntaxNode::new_open()), + self.nodes.push_syntax_node(SyntaxNode::new_vtable(vtable)), + self.nodes.push_syntax_node(SyntaxNode::new_close()), + ], + ); - return Ok(self.nodes.push_syntax_node(sn)); - } + return Ok(self.nodes.push_syntax_node(sn)); } if let Some(sq_id) = ir_plan.get_sub_query_from_row_node(id)? { @@ -695,7 +690,7 @@ where Ok(()) } - fn empty(plan: &'p ExecutionPlan<'e>) -> Self { + fn empty(plan: &'p ExecutionPlan) -> Self { SyntaxPlan { nodes: SyntaxNodes::new(), top: None, @@ -703,7 +698,7 @@ where } } - pub fn new(plan: &'p ExecutionPlan<'e>, top: usize) -> Result<Self, QueryPlannerError> { + pub fn new(plan: &'p ExecutionPlan, top: usize) -> Result<Self, QueryPlannerError> { let mut sp = SyntaxPlan::empty(plan); let ir_plan = plan.get_ir_plan(); diff --git a/src/executor/engine/cartridge/backend/sql/tree/tests.rs b/src/executor/engine/cartridge/backend/sql/tree/tests.rs index 3dbd4bf5b1180fc18e64a3862f43e22e966634b1..a8e0b5ff24d78127efdfde74f323c4c348004714 100644 --- a/src/executor/engine/cartridge/backend/sql/tree/tests.rs +++ b/src/executor/engine/cartridge/backend/sql/tree/tests.rs @@ -39,10 +39,11 @@ fn sql_order_selection() { let s = fs::read_to_string(path).unwrap(); let expected_plan = Plan::from_yaml(&s).unwrap(); assert_eq!(expected_plan, plan); - let exec_plan = ExecutionPlan::from(&plan); + let exec_plan = ExecutionPlan::from(plan.clone()); + let top_id = exec_plan.get_ir_plan().get_top().unwrap(); // test the syntax plan - let sp = SyntaxPlan::new(&exec_plan, plan.get_top().unwrap()).unwrap(); + let sp = SyntaxPlan::new(&exec_plan, top_id).unwrap(); let path = Path::new("") .join("tests") .join("artifactory") @@ -53,10 +54,12 @@ fn sql_order_selection() { let s = fs::read_to_string(path).unwrap(); let expected_syntax_nodes = SyntaxNodes::from_yaml(&s).unwrap(); assert_eq!(expected_syntax_nodes, sp.nodes); - let exec_plan = ExecutionPlan::from(&plan); + + let exec_plan = ExecutionPlan::from(plan); + let top_id = exec_plan.get_ir_plan().get_top().unwrap(); // get nodes in the sql-convenient order - let nodes = exec_plan.get_sql_order(plan.get_top().unwrap()).unwrap(); + let nodes = exec_plan.get_sql_order(top_id).unwrap(); let mut nodes_iter = nodes.into_iter(); assert_eq!(Some(SyntaxData::PlanId(16)), nodes_iter.next()); // projection assert_eq!(Some(SyntaxData::PlanId(13)), nodes_iter.next()); // ref diff --git a/src/executor/engine/cartridge/bucket.rs b/src/executor/engine/cartridge/bucket.rs deleted file mode 100644 index fd59adf9b4b80489b282cfc20136545013ea2c3a..0000000000000000000000000000000000000000 --- a/src/executor/engine/cartridge/bucket.rs +++ /dev/null @@ -1,19 +0,0 @@ -use std::convert::TryInto; -use std::hash::Hasher; - -use fasthash::{murmur3::Hasher32, FastHasher}; - -/// Determine bucket value using `murmur3` hash function -pub(in crate::executor::engine::cartridge) fn str_to_bucket_id( - s: &str, - bucket_count: usize, -) -> usize { - let mut hash = Hasher32::new(); - hash.write(s.as_bytes()); - - let hash: usize = hash.finish().try_into().unwrap(); - hash % bucket_count + 1 -} - -#[cfg(test)] -mod tests; diff --git a/src/executor/engine/cartridge/hash.rs b/src/executor/engine/cartridge/hash.rs new file mode 100644 index 0000000000000000000000000000000000000000..d747f1d523c7b43644234485e19e898c309c84ca --- /dev/null +++ b/src/executor/engine/cartridge/hash.rs @@ -0,0 +1,12 @@ +use fasthash::{murmur3::Hasher32, FastHasher}; +use std::hash::Hasher; + +/// Determine bucket value using `murmur3` hash function +pub(in crate::executor::engine) fn str_to_bucket_id(s: &str, bucket_count: usize) -> u64 { + let mut hash = Hasher32::new(); + hash.write(s.as_bytes()); + hash.finish() % bucket_count as u64 + 1 +} + +#[cfg(test)] +mod tests; diff --git a/src/executor/engine/cartridge/bucket/tests.rs b/src/executor/engine/cartridge/hash/tests.rs similarity index 100% rename from src/executor/engine/cartridge/bucket/tests.rs rename to src/executor/engine/cartridge/hash/tests.rs diff --git a/src/executor/engine/mock.rs b/src/executor/engine/mock.rs index 4280872727f0b94cb1468e9a1b1ba16c76b32fc6..0456ef524e7e5f3a01d70843df2b9fda48d305d0 100644 --- a/src/executor/engine/mock.rs +++ b/src/executor/engine/mock.rs @@ -1,19 +1,19 @@ -use std::cell::RefCell; use std::collections::HashMap; use crate::errors::QueryPlannerError; +use crate::executor::bucket::Buckets; +use crate::executor::engine::cartridge::hash::str_to_bucket_id; use crate::executor::engine::Engine; use crate::executor::ir::ExecutionPlan; use crate::executor::result::{BoxExecuteFormat, Value}; use crate::executor::vtable::VirtualTable; use crate::executor::Metadata; -use crate::ir::operator::Relational; use crate::ir::relation::{Column, Table, Type}; -use crate::ir::value::Value as IrValue; #[derive(Debug, Clone)] pub struct MetadataMock { tables: HashMap<String, Table>, + bucket_count: usize, } impl Metadata for MetadataMock { @@ -101,24 +101,27 @@ impl MetadataMock { Table::new_seg("\"t\"", columns.clone(), sharding_key).unwrap(), ); - MetadataMock { tables } + MetadataMock { + tables, + bucket_count: 10000, + } } } #[derive(Debug, Clone)] pub struct EngineMock { metadata: MetadataMock, - pub query_result: RefCell<BoxExecuteFormat>, + virtual_tables: HashMap<usize, VirtualTable>, } impl Engine for EngineMock { type Metadata = MetadataMock; - fn metadata(&self) -> Self::Metadata + fn metadata(&self) -> &Self::Metadata where Self: Sized, { - self.metadata.clone() + &self.metadata } fn has_metadata(&self) -> bool { @@ -136,95 +139,85 @@ impl Engine for EngineMock { fn materialize_motion( &self, - plan: &mut ExecutionPlan, + _plan: &mut ExecutionPlan, motion_node_id: usize, + _buckets: &Buckets, ) -> Result<VirtualTable, QueryPlannerError> { - let sq_id = &plan.get_motion_child(motion_node_id)?; - - let mut vtable = VirtualTable::new(); - - if let Relational::ScanSubQuery { alias, .. } = - &plan.get_ir_plan().get_relation_node(*sq_id)? - { - if let Some(name) = alias { - vtable.set_alias(name)?; - } + if let Some(virtual_table) = self.virtual_tables.get(&motion_node_id) { + Ok(virtual_table.clone()) + } else { + Err(QueryPlannerError::CustomError( + "No virtual table found for motion node".to_string(), + )) } - - vtable.add_column(Column { - name: "identification_number".into(), - r#type: Type::Integer, - }); - - vtable.add_values_tuple(vec![IrValue::number_from_str("2")?]); - vtable.add_values_tuple(vec![IrValue::number_from_str("3")?]); - - Ok(vtable) } fn exec( &self, plan: &mut ExecutionPlan, top_id: usize, + buckets: &Buckets, ) -> Result<BoxExecuteFormat, QueryPlannerError> { + let mut result = BoxExecuteFormat::new(); let sql = plan.subtree_as_sql(top_id)?; - if let Some(shard_keys) = plan.discovery(top_id)? { - for shard in shard_keys { - self.exec_query(&shard, &sql)?; + match buckets { + Buckets::All => { + result.extend(cluster_exec_query(&sql)?)?; + } + Buckets::Filtered(list) => { + for bucket in list { + let temp_result = bucket_exec_query(*bucket, &sql)?; + result.extend(temp_result)?; + } } - } else { - self.mp_exec_query(&sql)?; } - let mut result = self.query_result.borrow_mut(); - - // sorting for test reproduce + // Sort results to make tests reproducible. result.rows.sort_by_key(|k| k[0].to_string()); - - Ok(result.clone()) + Ok(result) } - fn determine_bucket_id(&self, _s: &str) -> usize { - 1 + fn determine_bucket_id(&self, s: &str) -> u64 { + str_to_bucket_id(s, self.metadata.bucket_count) } } impl EngineMock { pub fn new() -> Self { - let result_cell = RefCell::new(BoxExecuteFormat { - metadata: vec![], - rows: vec![], - }); - EngineMock { metadata: MetadataMock::new(), - query_result: result_cell, + virtual_tables: HashMap::new(), } } - fn exec_query( - &self, - shard_key: &str, - query: &str, - ) -> Result<BoxExecuteFormat, QueryPlannerError> { - let mut result = self.query_result.borrow_mut(); + pub fn add_virtual_table( + &mut self, + id: usize, + table: VirtualTable, + ) -> Result<(), QueryPlannerError> { + self.virtual_tables.insert(id, table); + Ok(()) + } +} - result.rows.push(vec![ - Value::String(format!("query send to [{}] shard", shard_key)), - Value::String(String::from(query)), - ]); +fn bucket_exec_query(bucket: u64, query: &str) -> Result<BoxExecuteFormat, QueryPlannerError> { + let mut result = BoxExecuteFormat::new(); - Ok(result.clone()) - } + result.rows.push(vec![ + Value::String(format!("Execute query on a bucket [{}]", bucket)), + Value::String(String::from(query)), + ]); - fn mp_exec_query(&self, query: &str) -> Result<BoxExecuteFormat, QueryPlannerError> { - let mut result = self.query_result.borrow_mut(); + Ok(result.clone()) +} - result.rows.push(vec![ - Value::String(String::from("query send to all shards")), - Value::String(String::from(query)), - ]); - Ok(result.clone()) - } +fn cluster_exec_query(query: &str) -> Result<BoxExecuteFormat, QueryPlannerError> { + let mut result = BoxExecuteFormat::new(); + + result.rows.push(vec![ + Value::String(String::from("Execute query on all buckets")), + Value::String(String::from(query)), + ]); + Ok(result.clone()) } diff --git a/src/executor/ir.rs b/src/executor/ir.rs index e212dbdfe1a2376a17255e5818c45e0e89d02333..2a5c6535c25b51b66c1a58e89e089d8b0af6c907 100644 --- a/src/executor/ir.rs +++ b/src/executor/ir.rs @@ -8,13 +8,13 @@ use crate::ir::transformation::redistribution::MotionPolicy; use crate::ir::Plan; #[derive(Debug, Clone)] -pub struct ExecutionPlan<'e> { - plan: &'e Plan, +pub struct ExecutionPlan { + plan: Plan, vtables: Option<HashMap<usize, VirtualTable>>, } -impl<'e> From<&'e Plan> for ExecutionPlan<'e> { - fn from(plan: &'e Plan) -> Self { +impl From<Plan> for ExecutionPlan { + fn from(plan: Plan) -> Self { ExecutionPlan { plan, vtables: None, @@ -22,9 +22,14 @@ impl<'e> From<&'e Plan> for ExecutionPlan<'e> { } } -impl<'e> ExecutionPlan<'e> { +impl ExecutionPlan { pub fn get_ir_plan(&self) -> &Plan { - self.plan + &self.plan + } + + #[allow(dead_code)] + pub fn get_mut_ir_plan(&mut self) -> &mut Plan { + &mut self.plan } /// Add materialize motion result to translation map of virtual tables @@ -55,7 +60,7 @@ impl<'e> ExecutionPlan<'e> { /// Get motion virtual table pub fn get_motion_vtable(&self, motion_id: usize) -> Result<VirtualTable, QueryPlannerError> { - if let Some(vtable) = self.vtables.clone() { + if let Some(vtable) = &self.vtables { if let Some(result) = vtable.get(&motion_id) { return Ok(result.clone()); } diff --git a/src/executor/shard/tests.rs b/src/executor/shard/tests.rs deleted file mode 100644 index c4b8e447dc76a54356c1c285e1e638a9c961ded4..0000000000000000000000000000000000000000 --- a/src/executor/shard/tests.rs +++ /dev/null @@ -1,106 +0,0 @@ -use std::collections::HashSet; - -use pretty_assertions::assert_eq; - -use crate::executor::engine::mock::MetadataMock; -use crate::executor::ir::ExecutionPlan; -use crate::frontend::sql::ast::AbstractSyntaxTree; - -#[test] -fn simple_union_query() { - let query = r#"SELECT * FROM ( - SELECT * FROM "test_space" WHERE "sysFrom" > 0 - UNION ALL - SELECT * FROM "test_space_hist" WHERE "sysFrom" < 0 - ) as "t3" - WHERE "id" = 1"#; - - let metadata = &MetadataMock::new(); - - let ast = AbstractSyntaxTree::new(query).unwrap(); - let mut plan = ast.to_ir(metadata).unwrap(); - plan.add_motions().unwrap(); - let mut ex_plan = ExecutionPlan::from(&plan); - - let top = plan.get_top().unwrap(); - // let expected = HashSet::from([3940]); - let expected = HashSet::from(["1".into()]); - assert_eq!(Some(expected), ex_plan.discovery(top).unwrap()) -} - -#[test] -fn simple_disjunction_in_union_query() { - let query = r#"SELECT * FROM ( - SELECT * FROM "test_space" WHERE "sysFrom" > 0 - UNION ALL - SELECT * FROM "test_space_hist" WHERE "sysFrom" < 0 - ) as "t3" - WHERE ("id" = 1) OR ("id" = 100)"#; - - let metadata = &MetadataMock::new(); - - let ast = AbstractSyntaxTree::new(query).unwrap(); - let mut plan = ast.to_ir(metadata).unwrap(); - plan.add_motions().unwrap(); - let mut ex_plan = ExecutionPlan::from(&plan); - - let top = plan.get_top().unwrap(); - // let expected = HashSet::from([3940, 18512]); - - let expected = HashSet::from(["1".into(), "100".into()]); - assert_eq!(Some(expected), ex_plan.discovery(top).unwrap()) -} - -#[test] -fn complex_shard_key_union_query() { - let query = r#"SELECT * -FROM - (SELECT "identification_number", "product_code" - FROM "hash_testing" - WHERE "sys_op" = 1 - UNION ALL - SELECT "identification_number", "product_code" - FROM "hash_testing_hist" - WHERE "sys_op" > 1) AS "t3" -WHERE "identification_number" = 1 AND "product_code" = '222'"#; - - let metadata = &MetadataMock::new(); - - let ast = AbstractSyntaxTree::new(query).unwrap(); - let mut plan = ast.to_ir(metadata).unwrap(); - plan.add_motions().unwrap(); - let mut ex_plan = ExecutionPlan::from(&plan); - - let top = plan.get_top().unwrap(); - - assert_eq!(None, ex_plan.discovery(top).unwrap()) -} - -#[test] -fn union_complex_cond_query() { - let query = r#"SELECT * -FROM - (SELECT "identification_number", "product_code" - FROM "hash_testing" - WHERE "sys_op" = 1 - UNION ALL - SELECT "identification_number", "product_code" - FROM "hash_testing_hist" - WHERE "sys_op" > 1) AS "t3" -WHERE ("identification_number" = 1 - OR ("identification_number" = 100 - OR "identification_number" = 1000)) - AND ("product_code" = '222' - OR "product_code" = '111')"#; - - let metadata = &MetadataMock::new(); - - let ast = AbstractSyntaxTree::new(query).unwrap(); - let mut plan = ast.to_ir(metadata).unwrap(); - plan.add_motions().unwrap(); - let mut ex_plan = ExecutionPlan::from(&plan); - - let top = plan.get_top().unwrap(); - - assert_eq!(None, ex_plan.discovery(top).unwrap()) -} diff --git a/src/executor/tests.rs b/src/executor/tests.rs index 772327c91cf27f9e6ca8674417bf8b93ba6af545..12fbe128ccc13f1703b179712c35bbbabad7a025 100644 --- a/src/executor/tests.rs +++ b/src/executor/tests.rs @@ -1,9 +1,10 @@ -use pretty_assertions::assert_eq; - +use super::*; use crate::executor::engine::mock::EngineMock; use crate::executor::result::Value; - -use super::*; +use crate::executor::vtable::VirtualTable; +use crate::ir::relation::{Column, Type}; +use crate::ir::value::Value as IrValue; +use pretty_assertions::assert_eq; #[test] fn shard_query() { @@ -11,13 +12,13 @@ fn shard_query() { let engine = EngineMock::new(); let mut query = Query::new(engine, sql).unwrap(); - query.optimize().unwrap(); let mut expected = BoxExecuteFormat::new(); + let bucket = query.engine.determine_bucket_id("1"); expected .rows .push(vec![ - Value::String(String::from("query send to [1] shard")), + Value::String(format!("Execute query on a bucket [{}]", bucket)), Value::String(String::from(r#"SELECT "test_space"."FIRST_NAME" as "FIRST_NAME" FROM "test_space" WHERE ("test_space"."id") = (1)"#)) ]); assert_eq!(expected, query.exec().unwrap()) @@ -39,13 +40,13 @@ fn shard_union_query() { let engine = EngineMock::new(); let mut query = Query::new(engine, sql).unwrap(); - query.optimize().unwrap(); let mut expected = BoxExecuteFormat::new(); + let bucket = query.engine.determine_bucket_id("1"); expected .rows .push(vec![ - Value::String(String::from("query send to [1] shard")), + Value::String(format!("Execute query on a bucket [{}]", bucket)), Value::String( format!( "{} {}{} {} {}{} {}", @@ -69,11 +70,11 @@ fn map_reduce_query() { let engine = EngineMock::new(); let mut query = Query::new(engine, sql).unwrap(); - query.optimize().unwrap(); let mut expected = BoxExecuteFormat::new(); + let bucket = query.engine.determine_bucket_id(&["1", "457"].join("")); expected.rows.push(vec![ - Value::String(String::from("query send to all shards")), + Value::String(format!("Execute query on a bucket [{}]", bucket)), Value::String( format!( "{} {} {}", @@ -94,14 +95,22 @@ fn linker_test() { let engine = EngineMock::new(); let mut query = Query::new(engine, sql).unwrap(); - query.optimize().unwrap(); + let motion_id = query.exec_plan.get_ir_plan().get_slices().unwrap()[0][0]; + let mut virtual_table = virtual_table_23(); + virtual_table.set_alias("test").unwrap(); + query + .engine + .add_virtual_table(motion_id, virtual_table) + .unwrap(); let result = query.exec().unwrap(); let mut expected = BoxExecuteFormat::new(); + let bucket2 = query.engine.determine_bucket_id("2"); + let bucket3 = query.engine.determine_bucket_id("3"); expected.rows.extend(vec![ vec![ - Value::String(String::from("query send to [2] shard")), + Value::String(format!("Execute query on a bucket [{}]", bucket3)), Value::String(format!( "{} {} {}", r#"SELECT "test_space"."FIRST_NAME" as "FIRST_NAME""#, @@ -110,7 +119,7 @@ fn linker_test() { )), ], vec![ - Value::String(String::from("query send to [3] shard")), + Value::String(format!("Execute query on a bucket [{}]", bucket2)), Value::String(format!( "{} {} {}", r#"SELECT "test_space"."FIRST_NAME" as "FIRST_NAME""#, @@ -140,14 +149,22 @@ fn union_linker_test() { let engine = EngineMock::new(); let mut query = Query::new(engine, sql).unwrap(); - query.optimize().unwrap(); + let motion_id = query.exec_plan.get_ir_plan().get_slices().unwrap()[0][0]; + let mut virtual_table = virtual_table_23(); + virtual_table.set_alias("\"t2\"").unwrap(); + query + .engine + .add_virtual_table(motion_id, virtual_table) + .unwrap(); let result = query.exec().unwrap(); let mut expected = BoxExecuteFormat::new(); + let bucket2 = query.engine.determine_bucket_id("2"); + let bucket3 = query.engine.determine_bucket_id("3"); expected.rows.extend(vec![ vec![ - Value::String(String::from("query send to [2] shard")), + Value::String(format!("Execute query on a bucket [{}]", bucket3)), Value::String( format!( "{} {}{} {} {} {} {} {} {}{} {}", @@ -166,7 +183,7 @@ fn union_linker_test() { ) ], vec![ - Value::String(String::from("query send to [3] shard")), + Value::String(format!("Execute query on a bucket [{}]", bucket2)), Value::String( format!( "{} {}{} {} {} {} {} {} {}{} {}", @@ -210,45 +227,27 @@ INNER JOIN FROM "hash_single_testing_hist" WHERE "sys_op" <= 0) AS "t8" ON "t3"."id" = "t8"."identification_number" -WHERE "t3"."id" = 1 AND "t8"."identification_number" = 1"#; +WHERE "t3"."id" = 2 AND "t8"."identification_number" = 2"#; let engine = EngineMock::new(); let mut query = Query::new(engine, sql).unwrap(); - query.optimize().unwrap(); + let motion_id = query.exec_plan.get_ir_plan().get_slices().unwrap()[0][0]; + let mut virtual_table = virtual_table_23(); + virtual_table.set_alias("\"t8\"").unwrap(); + query + .engine + .add_virtual_table(motion_id, virtual_table) + .unwrap(); let result = query.exec().unwrap(); let mut expected = BoxExecuteFormat::new(); + let bucket2 = query.engine.determine_bucket_id("2"); expected.rows.extend(vec![ vec![ - Value::String(String::from("query send to [1] shard")), - Value::String( - format!( - "{}, {}, {} {}{} {} {} {} {} {} {}{} {} {}{} {} {}", - r#"SELECT "t3"."id" as "id""#, - r#""t3"."FIRST_NAME" as "FIRST_NAME""#, - r#""t8"."identification_number" as "identification_number""#, - r#"FROM ("#, - r#"SELECT "test_space"."id" as "id", "test_space"."FIRST_NAME" as "FIRST_NAME""#, - r#"FROM "test_space""#, - r#"WHERE ("test_space"."sys_op") < (0) and ("test_space"."sysFrom") >= (0)"#, - r#"UNION ALL"#, - r#"SELECT "test_space_hist"."id" as "id", "test_space_hist"."FIRST_NAME" as "FIRST_NAME""#, - r#"FROM "test_space_hist""#, - r#"WHERE ("test_space_hist"."sysFrom") <= (0)"#, - r#") as "t3""#, - r#"INNER JOIN"#, - r#"(SELECT COLUMN_2 as "identification_number" FROM (VALUES (2),(3))"#, - r#") as "t8""#, - r#"ON ("t3"."id") = ("t8"."identification_number")"#, - r#"WHERE ("t3"."id") = (1) and ("t8"."identification_number") = (1)"# - ) - ) - ], - vec![ - Value::String(String::from("query send to [2] shard")), + Value::String(format!("Execute query on a bucket [{}]", bucket2)), Value::String( format!( "{}, {}, {} {}{} {} {} {} {} {} {}{} {} {}{} {} {}", @@ -258,7 +257,7 @@ WHERE "t3"."id" = 1 AND "t8"."identification_number" = 1"#; r#"FROM ("#, r#"SELECT "test_space"."id" as "id", "test_space"."FIRST_NAME" as "FIRST_NAME""#, r#"FROM "test_space""#, - r#"WHERE ("test_space"."sys_op") < (0) and ("test_space"."sysFrom") >= (0)"#, + r#"WHERE (0) > ("test_space"."sys_op") and ("test_space"."sysFrom") >= (0)"#, r#"UNION ALL"#, r#"SELECT "test_space_hist"."id" as "id", "test_space_hist"."FIRST_NAME" as "FIRST_NAME""#, r#"FROM "test_space_hist""#, @@ -267,33 +266,8 @@ WHERE "t3"."id" = 1 AND "t8"."identification_number" = 1"#; r#"INNER JOIN"#, r#"(SELECT COLUMN_2 as "identification_number" FROM (VALUES (2),(3))"#, r#") as "t8""#, - r#"ON ("t3"."id") = ("t8"."identification_number")"#, - r#"WHERE ("t3"."id") = (1) and ("t8"."identification_number") = (1)"# - ) - ) - ], - vec![ - Value::String(String::from("query send to [3] shard")), - Value::String( - format!( - "{}, {}, {} {}{} {} {} {} {} {} {}{} {} {}{} {} {}", - r#"SELECT "t3"."id" as "id""#, - r#""t3"."FIRST_NAME" as "FIRST_NAME""#, - r#""t8"."identification_number" as "identification_number""#, - r#"FROM ("#, - r#"SELECT "test_space"."id" as "id", "test_space"."FIRST_NAME" as "FIRST_NAME""#, - r#"FROM "test_space""#, - r#"WHERE ("test_space"."sys_op") < (0) and ("test_space"."sysFrom") >= (0)"#, - r#"UNION ALL"#, - r#"SELECT "test_space_hist"."id" as "id", "test_space_hist"."FIRST_NAME" as "FIRST_NAME""#, - r#"FROM "test_space_hist""#, - r#"WHERE ("test_space_hist"."sysFrom") <= (0)"#, - r#") as "t3""#, - r#"INNER JOIN"#, - r#"(SELECT COLUMN_2 as "identification_number" FROM (VALUES (2),(3))"#, - r#") as "t8""#, - r#"ON ("t3"."id") = ("t8"."identification_number")"#, - r#"WHERE ("t3"."id") = (1) and ("t8"."identification_number") = (1)"# + r#"ON ("t3"."id") = (SELECT COLUMN_4 as "identification_number" FROM (VALUES (2),(3)))"#, + r#"WHERE ("t3"."id", "t8"."identification_number") = (2, 2)"# ) ) ], @@ -311,14 +285,25 @@ fn anonymous_col_index_test() { let engine = EngineMock::new(); let mut query = Query::new(engine, sql).unwrap(); - query.optimize().unwrap(); + let motion1_id = query.exec_plan.get_ir_plan().get_slices().unwrap()[0][0]; + query + .engine + .add_virtual_table(motion1_id, virtual_table_23()) + .unwrap(); + let motion2_id = query.exec_plan.get_ir_plan().get_slices().unwrap()[0][1]; + query + .engine + .add_virtual_table(motion2_id, virtual_table_23()) + .unwrap(); let result = query.exec().unwrap(); let mut expected = BoxExecuteFormat::new(); + let bucket2 = query.engine.determine_bucket_id("2"); + let bucket3 = query.engine.determine_bucket_id("3"); expected.rows.extend(vec![ vec![ - Value::String(String::from("query send to [2] shard")), + Value::String(format!("Execute query on a bucket [{}]", bucket3)), Value::String(format!( "{} {}, {}, {}, {}, {} {} {} {} {} {}", "SELECT", @@ -335,7 +320,7 @@ fn anonymous_col_index_test() { )), ], vec![ - Value::String(String::from("query send to [3] shard")), + Value::String(format!("Execute query on a bucket [{}]", bucket2)), Value::String(format!( "{} {}, {}, {}, {}, {} {} {} {} {} {}", "SELECT", @@ -355,3 +340,18 @@ fn anonymous_col_index_test() { assert_eq!(expected, result) } + +/// Helper function to create a "test" virtual table. +fn virtual_table_23() -> VirtualTable { + let mut virtual_table = VirtualTable::new(); + + virtual_table.add_column(Column { + name: "identification_number".into(), + r#type: Type::Integer, + }); + + virtual_table.add_values_tuple(vec![IrValue::number_from_str("2").unwrap()]); + virtual_table.add_values_tuple(vec![IrValue::number_from_str("3").unwrap()]); + + virtual_table +} diff --git a/src/executor/vtable.rs b/src/executor/vtable.rs index 45f04eb276acea54bd4de2783a38785b8afbedb3..260dd0addf911abe139a1c0c0baa05da82d22eac 100644 --- a/src/executor/vtable.rs +++ b/src/executor/vtable.rs @@ -77,8 +77,8 @@ impl VirtualTable { /// Get tuples was distributed by sharding keys #[must_use] - pub fn get_tuple_distribution(&self) -> HashMap<String, HashSet<usize>> { - self.hashing.clone() + pub fn get_tuple_distribution(&self) -> &HashMap<String, HashSet<usize>> { + &self.hashing } /// Distribute tuples by sharding key columns diff --git a/src/frontend/sql/ir/tests.rs b/src/frontend/sql/ir/tests.rs index 3889c984b3557ca1cb17e1ec3e8a87207c4c6586..b0388dcd1b6d1c092f4139234c72dc0185c0868b 100644 --- a/src/frontend/sql/ir/tests.rs +++ b/src/frontend/sql/ir/tests.rs @@ -13,9 +13,9 @@ fn simple_query_to_ir() { let metadata = &MetadataMock::new(); let ast = AbstractSyntaxTree::new(query).unwrap(); let plan = ast.to_ir(metadata).unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( @@ -38,9 +38,9 @@ fn complex_cond_query_transform() { let metadata = &MetadataMock::new(); let ast = AbstractSyntaxTree::new(query).unwrap(); let plan = ast.to_ir(metadata).unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( @@ -70,9 +70,9 @@ fn simple_union_query_transform() { let metadata = &MetadataMock::new(); let ast = AbstractSyntaxTree::new(query).unwrap(); let plan = ast.to_ir(metadata).unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( format!( @@ -109,9 +109,9 @@ WHERE ("identification_number" = 1 let metadata = &MetadataMock::new(); let ast = AbstractSyntaxTree::new(query).unwrap(); let plan = ast.to_ir(metadata).unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( @@ -139,9 +139,9 @@ fn sub_query_in_selection() { let metadata = &MetadataMock::new(); let ast = AbstractSyntaxTree::new(query).unwrap(); let plan = ast.to_ir(metadata).unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( @@ -166,9 +166,9 @@ fn inner_join() { let metadata = &MetadataMock::new(); let ast = AbstractSyntaxTree::new(query).unwrap(); let plan = ast.to_ir(metadata).unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( @@ -215,9 +215,9 @@ fn simple_query_with_unquoted_aliases() { let metadata = &MetadataMock::new(); let ast = AbstractSyntaxTree::new(query).unwrap(); let plan = ast.to_ir(metadata).unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( @@ -256,9 +256,9 @@ fn inner_join_1() { let metadata = &MetadataMock::new(); let ast = AbstractSyntaxTree::new(query).unwrap(); let plan = ast.to_ir(metadata).unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( diff --git a/src/ir/transformation/bool_in/tests.rs b/src/ir/transformation/bool_in/tests.rs index 1b327495a616aca814c8f57c75c3232433623842..0b415c68893c9df79701357e416017c2964229c2 100644 --- a/src/ir/transformation/bool_in/tests.rs +++ b/src/ir/transformation/bool_in/tests.rs @@ -12,9 +12,9 @@ fn bool_in1() { let ast = AbstractSyntaxTree::new(query).unwrap(); let mut plan = ast.to_ir(metadata).unwrap(); plan.replace_in_operator().unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( format!( @@ -35,9 +35,9 @@ fn bool_in2() { let ast = AbstractSyntaxTree::new(query).unwrap(); let mut plan = ast.to_ir(metadata).unwrap(); plan.replace_in_operator().unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( format!( @@ -57,9 +57,9 @@ fn bool_in3() { let ast = AbstractSyntaxTree::new(query).unwrap(); let mut plan = ast.to_ir(metadata).unwrap(); plan.replace_in_operator().unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( format!( diff --git a/src/ir/transformation/dnf.rs b/src/ir/transformation/dnf.rs index c3b1ee1702cb2bbb1cb02a1a43a333b578390d23..6e112fc7ee3409d0d137b5cdc9d109071fad5195 100644 --- a/src/ir/transformation/dnf.rs +++ b/src/ir/transformation/dnf.rs @@ -77,7 +77,7 @@ use std::collections::VecDeque; /// A chain of the trivalents (boolean or NULL expressions) concatenated by AND. #[derive(Clone, Debug)] -struct Chain { +pub struct Chain { nodes: VecDeque<usize>, } @@ -144,6 +144,11 @@ impl Chain { top_id.ok_or_else(|| QueryPlannerError::CustomError("Empty chain".into()))?; Ok(new_top_id) } + + /// Return a mutable reference to the chain nodes. + pub fn get_mut_nodes(&mut self) -> &mut VecDeque<usize> { + &mut self.nodes + } } fn call_expr_tree_to_dnf(plan: &mut Plan, top_id: usize) -> Result<usize, QueryPlannerError> { @@ -151,14 +156,12 @@ fn call_expr_tree_to_dnf(plan: &mut Plan, top_id: usize) -> Result<usize, QueryP } impl Plan { - /// Convert an expression tree of trivalent nodes to a disjunctive normal form (DNF). + /// Get the DNF "AND" chains from the expression tree. /// /// # Errors /// - If the expression tree is not a trivalent expression. /// - Failed to append node to the AND chain. - /// - Failed to convert the AND chain to a new expression tree. - /// - Failed to concatenate the AND expression trees to the OR tree. - pub fn expr_tree_to_dnf(&mut self, top_id: usize) -> Result<usize, QueryPlannerError> { + pub fn get_dnf_chains(&self, top_id: usize) -> Result<VecDeque<Chain>, QueryPlannerError> { let mut result: VecDeque<Chain> = VecDeque::new(); let mut stack: Vec<Chain> = Vec::new(); @@ -199,6 +202,18 @@ impl Plan { stack.push(chain); } + Ok(result) + } + + /// Convert an expression tree of trivalent nodes to a disjunctive normal form (DNF). + /// + /// # Errors + /// - Failed to retrieve DNF chains. + /// - Failed to convert the AND chain to a new expression tree. + /// - Failed to concatenate the AND expression trees to the OR tree. + pub fn expr_tree_to_dnf(&mut self, top_id: usize) -> Result<usize, QueryPlannerError> { + let mut result = self.get_dnf_chains(top_id)?; + let mut new_top_id: Option<usize> = None; while let Some(mut chain) = result.pop_front() { let ir_chain_top = chain.as_plan(self)?; diff --git a/src/ir/transformation/dnf/tests.rs b/src/ir/transformation/dnf/tests.rs index ec46970c05029b6c4f752137e2e76f580ee5ff87..5a5e230b2b221a118953ab1c5e7e37158598a8c8 100644 --- a/src/ir/transformation/dnf/tests.rs +++ b/src/ir/transformation/dnf/tests.rs @@ -13,9 +13,9 @@ fn dnf1() { let ast = AbstractSyntaxTree::new(query).unwrap(); let mut plan = ast.to_ir(metadata).unwrap(); plan.set_dnf().unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( format!( @@ -37,9 +37,9 @@ fn dnf2() { let ast = AbstractSyntaxTree::new(query).unwrap(); let mut plan = ast.to_ir(metadata).unwrap(); plan.set_dnf().unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( format!( @@ -61,9 +61,9 @@ fn dnf3() { let ast = AbstractSyntaxTree::new(query).unwrap(); let mut plan = ast.to_ir(metadata).unwrap(); plan.set_dnf().unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( format!( @@ -84,9 +84,9 @@ fn dnf4() { let ast = AbstractSyntaxTree::new(query).unwrap(); let mut plan = ast.to_ir(metadata).unwrap(); plan.set_dnf().unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( format!( @@ -107,9 +107,9 @@ fn dnf5() { let ast = AbstractSyntaxTree::new(query).unwrap(); let mut plan = ast.to_ir(metadata).unwrap(); plan.set_dnf().unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( format!( diff --git a/src/ir/transformation/equality_propagation.rs b/src/ir/transformation/equality_propagation.rs index 910229acb247f7857d2742acd1bcc5c5192c27a1..4836ca20e01cef9e5fddafde854c136761aab6bc 100644 --- a/src/ir/transformation/equality_propagation.rs +++ b/src/ir/transformation/equality_propagation.rs @@ -324,9 +324,6 @@ impl Nodes { } } } - if tops.is_empty() { - return Err(QueryPlannerError::RedundantTransformation); - } Ok(tops) } diff --git a/src/ir/transformation/merge_tuples/tests.rs b/src/ir/transformation/merge_tuples/tests.rs index 8e8e1d8a2ffb421997ff352cdafd9631f63de0b3..1ec3f7c0e8283ad497211a01e28b1f58d72798cf 100644 --- a/src/ir/transformation/merge_tuples/tests.rs +++ b/src/ir/transformation/merge_tuples/tests.rs @@ -12,9 +12,9 @@ fn merge_tuples1() { let ast = AbstractSyntaxTree::new(query).unwrap(); let mut plan = ast.to_ir(metadata).unwrap(); plan.merge_tuples().unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( format!( @@ -36,9 +36,9 @@ fn merge_tuples2() { let ast = AbstractSyntaxTree::new(query).unwrap(); let mut plan = ast.to_ir(metadata).unwrap(); plan.merge_tuples().unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( format!( @@ -59,9 +59,9 @@ fn merge_tuples3() { let ast = AbstractSyntaxTree::new(query).unwrap(); let mut plan = ast.to_ir(metadata).unwrap(); plan.merge_tuples().unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( format!("{}", r#"SELECT "t"."a" as "a" FROM "t" WHERE true"#,), @@ -77,9 +77,9 @@ fn merge_tuples4() { let ast = AbstractSyntaxTree::new(query).unwrap(); let mut plan = ast.to_ir(metadata).unwrap(); plan.merge_tuples().unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( format!( @@ -98,9 +98,9 @@ fn merge_tuples5() { let ast = AbstractSyntaxTree::new(query).unwrap(); let mut plan = ast.to_ir(metadata).unwrap(); plan.merge_tuples().unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( format!( diff --git a/src/ir/transformation/split_columns/tests.rs b/src/ir/transformation/split_columns/tests.rs index 26787ab27992634b5020f5eaf3b4f5b9d5147262..040b4e0e7c0a3f6df5b13f4ba3d674e862125eca 100644 --- a/src/ir/transformation/split_columns/tests.rs +++ b/src/ir/transformation/split_columns/tests.rs @@ -12,9 +12,9 @@ fn split_columns1() { let ast = AbstractSyntaxTree::new(query).unwrap(); let mut plan = ast.to_ir(metadata).unwrap(); plan.split_columns().unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( format!( @@ -33,9 +33,9 @@ fn split_columns2() { let ast = AbstractSyntaxTree::new(query).unwrap(); let mut plan = ast.to_ir(metadata).unwrap(); plan.split_columns().unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( format!( @@ -73,9 +73,9 @@ fn split_columns4() { let ast = AbstractSyntaxTree::new(query).unwrap(); let mut plan = ast.to_ir(metadata).unwrap(); plan.split_columns().unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( format!( @@ -94,9 +94,9 @@ fn split_columns5() { let ast = AbstractSyntaxTree::new(query).unwrap(); let mut plan = ast.to_ir(metadata).unwrap(); plan.split_columns().unwrap(); - let ex_plan = ExecutionPlan::from(&plan); + let ex_plan = ExecutionPlan::from(plan); - let top_id = plan.get_top().unwrap(); + let top_id = ex_plan.get_ir_plan().get_top().unwrap(); let sql = ex_plan.subtree_as_sql(top_id).unwrap(); assert_eq!( format!( diff --git a/src/ir/value.rs b/src/ir/value.rs index ce525ae9818acfdd1116de804eb8598480fe0d9b..2ddccbf14e5410d1ac2c0f1cd0806bf2a97cdd88 100644 --- a/src/ir/value.rs +++ b/src/ir/value.rs @@ -107,6 +107,17 @@ impl Value { } } +impl From<Value> for String { + fn from(v: Value) -> Self { + match v { + Value::Boolean(b) => b.to_string(), + Value::Null => "NULL".to_string(), + Value::Number(n) => n.to_string(), + Value::String(s) => s, + } + } +} + impl From<Trivalent> for Value { fn from(f: Trivalent) -> Self { match f { diff --git a/src/parser.rs b/src/parser.rs index 0f2ee65fee312c2ac34ccd88939ac63a1bb3ef6d..12041bbc0188003743248bfebb1c983190b5bb31 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -90,10 +90,6 @@ pub extern "C" fn execute_query(ctx: FunctionCtx, args: FunctionArgs) -> c_int { } }; - if let Err(e) = query.optimize() { - return tarantool::set_error!(TarantoolErrorCode::ProcC, "{}", e.to_string()); - } - match query.exec() { Ok(q) => { ctx.return_mp(&q).unwrap();