diff --git a/src/errors.rs b/src/errors.rs index 6d83c92becf209f6db99e6bf0c95ee4956a6c255..b4391bdddebf4929bf69b5a191ed80ceab548b1e 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -5,7 +5,6 @@ const SIMPLE_QUERY_ERROR: &str = "query doesn't simple"; const SIMPLE_UNION_QUERY_ERROR: &str = "query doesn't simple union"; const QUERY_NOT_IMPLEMENTED: &str = "query wasn't s implemented"; const BUCKET_ID_ERROR: &str = "field doesn't contains sharding key value"; -const SHARDING_KEY_FILTER_ERROR: &str = "query doesn't have sharding key filter"; #[derive(Debug, Clone, PartialEq, Serialize)] pub enum QueryPlannerError { @@ -13,7 +12,6 @@ pub enum QueryPlannerError { SimpleUnionQueryError, QueryNotImplemented, BucketIdError, - ShardingKeyFilterError } impl fmt::Display for QueryPlannerError { @@ -23,7 +21,6 @@ impl fmt::Display for QueryPlannerError { QueryPlannerError::SimpleUnionQueryError => SIMPLE_UNION_QUERY_ERROR, QueryPlannerError::QueryNotImplemented => QUERY_NOT_IMPLEMENTED, QueryPlannerError::BucketIdError => BUCKET_ID_ERROR, - QueryPlannerError::ShardingKeyFilterError => SHARDING_KEY_FILTER_ERROR }; write!(f, "{}", p) } @@ -63,13 +60,4 @@ impl fmt::Display for BucketIdError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}", BUCKET_ID_ERROR) } -} - -#[derive(Debug, Clone, PartialEq, Serialize)] -pub struct ShardingKeyFilterError; - -impl fmt::Display for ShardingKeyFilterError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", SHARDING_KEY_FILTER_ERROR) - } } \ No newline at end of file diff --git a/src/query.rs b/src/query.rs index db09d067e7583e28e46d756393d9c153367ac87e..beefb1fd19a66e76d5add3cd6aa4111797aa73c4 100644 --- a/src/query.rs +++ b/src/query.rs @@ -110,21 +110,18 @@ impl UserQuery { TableFactor::Table { name, alias: _, args: _, with_hints: _ } => { let table = name.to_string().replace("\"", ""); self.schema.to_owned().get_sharding_key_by_space(&table) - }, + } _ => return Err(QueryPlannerError::SimpleQueryError) }; let mut result = ShardInfo::from(sharding_key); let filters = select_query.to_owned().selection.unwrap(); - let sharding_key_parts = match extract_sharding_key_values(&filters, &result.sharding_keys) { + let sharding_key_values = match extract_sharding_key_values(&filters, &result.sharding_keys) { Ok(r) => r, Err(e) => return Err(e), }; - match result.set_sharding_key_values(sharding_key_parts) { - Err(e) => return Err(e), - _ => () - }; + result.keys = sharding_key_values; Ok(result) } @@ -145,48 +142,13 @@ impl From<Vec<String>> for ShardInfo { } } -impl ShardInfo { - /// Function transform parts sharding key `query_values` to result sharding key value. - /// For example there is sharding key parts `query_values`: - /// {"col1": [1, 3], "col2": [2, 4]} - /// - /// Function result for example above will be: - /// [{ "col1": 1, "col2": 2}, { "col1": 4, "col2": 4}] - /// - /// If parts key in `query_values` have different count of values, then function works - /// with shortest from it's, because it count equals count of full sharding key value. - fn set_sharding_key_values(&mut self, query_values: HashMap<String, Vec<String>>) -> Result<(), QueryPlannerError> { - let mut count_sharding_key_vals = usize::MAX; - for (_, v) in &query_values { - if count_sharding_key_vals > v.len() { - count_sharding_key_vals = v.len(); - } - } - - if count_sharding_key_vals == usize::MAX { - return Err(QueryPlannerError::ShardingKeyFilterError); - } - - // loop needs for transforming parts of sharding key to sharding key values, for example: - for i in 0..count_sharding_key_vals { - let mut tmp = HashMap::new(); - for k in self.sharding_keys.iter() { - tmp.insert(k.to_string(), query_values[k][i].clone()); - } - self.keys.push(tmp); - } - - Ok(()) - } -} - -/// Function extract part of sharding key from `e` condition, and append it to map of sharding key parts (`out_map`). +/// Function extract sharding key values from query filter section. /// -/// For example there is condition (sharding key is <col1, col2>): +/// For example there is clause (sharding key is <col1, col2>): /// /// (col1 = 1 and col2 = 2) OR (col1 = 3 and col2 = 4) /// -/// This condition will parse to recursive structure (`e: Expr`): +/// This condition will parse to ast (`e: Expr`): /// BinaryOperator::Or /// / \ /// Expr::Nested Expr::Nested @@ -198,36 +160,102 @@ impl ShardInfo { /// col1 1 col2 2 col1 3 col2 4 /// /// -/// for above condition example `out_map` will be: +/// for above ast query function return list sharding key values: +/// [ +/// {"col1":"1", "col2":"2"}, +/// {"col1":"4", "col2":"4"} +/// ] /// -/// { -/// "col1": [1, 3], -/// "col2": [2, 4], -/// } +/// Another example is clause: +/// ("col1" = 1 OR ("col1" = 2 OR "col1" = 3)) AND ("col2" = 4 OR "col2" = 5) /// -/// Variable `out_rec_count` contains count of sharding key values from query condition. -fn extract_sharding_key_values(e: &Expr, sharding_key: &Vec<String>) -> Result<HashMap<String, Vec<String>>, QueryPlannerError> { - let mut result: HashMap<String, Vec<String>> = HashMap::new(); - for r in sharding_key { - result.insert(r.to_string(), Vec::new()); - } +/// Ast in this case is: +/// +/// BinaryOperator::AND +/// / \ +/// Expr::Nested Expr::Nested +/// / \ +/// BinaryOperator::Or BinaryOperator::Or +/// / \ / \ +/// BinaryOperator::Eq Expr::Nested BinaryOperator::Eq BinaryOperator::Eq +/// / \ \ / \ / \ +/// col1 1 BinaryOperator::Or col2 4 col2 5 +/// / \ +/// BinaryOperator::Eq BinaryOperator::Eq +/// / \ / \ +/// col1 2 col1 3 +/// +/// This ast function transforms to list: +/// [ +/// {"col1":"1", "col2":"4"}, +/// {"col1":"1", "col2":"5"}, +/// {"col1":"2", "col2":"4"}, +/// {"col1":"2", "col2":"5"}, +/// {"col1":"3", "col2":"4"}, +/// {"col1":"3", "col2":"5"} +/// ] + +fn extract_sharding_key_values(e: &Expr, sharding_key: &Vec<String>) -> Result<Vec<HashMap<String, String>>, QueryPlannerError> { + let mut result = Vec::new(); match e { Expr::BinaryOp { left, op, right } => match op { BinaryOperator::Eq => { + let mut shard_key_value = HashMap::new(); + let field_name = left.to_string().to_lowercase().replace("\"", ""); - if let Some(key) = result.get_mut(&field_name) { - key.push(right.to_string().replace("\"", "")); + + for shard_key_part in sharding_key { + if shard_key_part == &field_name { + shard_key_value.insert(field_name.to_string(), right.to_string().replace("\"", "")); + } + } + + result.push(shard_key_value); + } + + BinaryOperator::And => { + // if operation operator `AND` needs cross join children leaves results, + // because they contains sharding key parts (see example AST in function docs) + + let l_leaf = match extract_sharding_key_values(&left, sharding_key) { + Ok(res) => res, + Err(e) => return Err(e) + }; + + let r_leaf = match extract_sharding_key_values(&right, sharding_key) { + Ok(res) => res, + Err(e) => return Err(e) + }; + + if l_leaf.is_empty() { + result.extend(r_leaf); + return Ok(result); + } + + if r_leaf.is_empty() { + result.extend(l_leaf); + return Ok(result); + } + + // cross join hashmap vector for getting all combination of complex sharding key values. + for i in &l_leaf { + for j in &r_leaf { + let mut v = i.to_owned(); + v.extend(j.to_owned()); + result.push(v); + } } } - BinaryOperator::And | BinaryOperator::Or => { + BinaryOperator::Or => { + // if operation operator `OR` needs union results from children leaves, + // as they contains full sharding key values (see example AST in function docs) + match extract_sharding_key_values(&left, sharding_key) { Ok(res) => { - for (k, val) in res { - if let Some(key) = result.get_mut(&k) { - key.extend(val); - } + for k in res { + result.push(k); } } Err(e) => return Err(e) @@ -235,10 +263,8 @@ fn extract_sharding_key_values(e: &Expr, sharding_key: &Vec<String>) -> Result<H match extract_sharding_key_values(&right, sharding_key) { Ok(res) => { - for (k, val) in res { - if let Some(key) = result.get_mut(&k) { - key.extend(val); - } + for k in res { + result.push(k); } } Err(e) => return Err(e) @@ -454,25 +480,25 @@ mod tests { SELECT * FROM \"complex_idx_test\" WHERE \"sys_op\" < 0 ) as \"t3\" WHERE (\"identification_number\" = 1 AND \"product_code\" = \"222\") - AND (\"identification_number\" = 100 AND \"product_code\" = \"111\") + OR (\"identification_number\" = 100 AND \"product_code\" = \"111\") "; let mut expected_result = Vec::new(); expected_result.push(QueryResult { bucket_id: 2926, - node_query: "SELECT * FROM \"complex_idx_test\" WHERE ((\"identification_number\" = 1 AND \"product_code\" = \"222\") AND (\"identification_number\" = 100 AND \"product_code\" = \"111\")) AND (\"sys_op\" > 0)".to_string(), + node_query: "SELECT * FROM \"complex_idx_test\" WHERE ((\"identification_number\" = 1 AND \"product_code\" = \"222\") OR (\"identification_number\" = 100 AND \"product_code\" = \"111\")) AND (\"sys_op\" > 0)".to_string(), }); expected_result.push(QueryResult { bucket_id: 4202, - node_query: "SELECT * FROM \"complex_idx_test\" WHERE ((\"identification_number\" = 1 AND \"product_code\" = \"222\") AND (\"identification_number\" = 100 AND \"product_code\" = \"111\")) AND (\"sys_op\" > 0)".to_string(), + node_query: "SELECT * FROM \"complex_idx_test\" WHERE ((\"identification_number\" = 1 AND \"product_code\" = \"222\") OR (\"identification_number\" = 100 AND \"product_code\" = \"111\")) AND (\"sys_op\" > 0)".to_string(), }); expected_result.push(QueryResult { bucket_id: 2926, - node_query: "SELECT * FROM \"complex_idx_test\" WHERE ((\"identification_number\" = 1 AND \"product_code\" = \"222\") AND (\"identification_number\" = 100 AND \"product_code\" = \"111\")) AND (\"sys_op\" < 0)".to_string(), + node_query: "SELECT * FROM \"complex_idx_test\" WHERE ((\"identification_number\" = 1 AND \"product_code\" = \"222\") OR (\"identification_number\" = 100 AND \"product_code\" = \"111\")) AND (\"sys_op\" < 0)".to_string(), }); expected_result.push(QueryResult { bucket_id: 4202, - node_query: "SELECT * FROM \"complex_idx_test\" WHERE ((\"identification_number\" = 1 AND \"product_code\" = \"222\") AND (\"identification_number\" = 100 AND \"product_code\" = \"111\")) AND (\"sys_op\" < 0)".to_string(), + node_query: "SELECT * FROM \"complex_idx_test\" WHERE ((\"identification_number\" = 1 AND \"product_code\" = \"222\") OR (\"identification_number\" = 100 AND \"product_code\" = \"111\")) AND (\"sys_op\" < 0)".to_string(), }); let q = UserQuery::new(test_query, s.clone(), 30000).unwrap(); @@ -519,6 +545,38 @@ mod tests { assert_eq!(q.transform().unwrap(), expected_result) } + #[test] + fn test_transform_complex_in_cond() { + let test_query = "SELECT * FROM ( + SELECT * FROM \"complex_idx_test\" WHERE \"sys_from\" <= 0 AND \"sys_to\" >= 0 + UNION ALL + SELECT * FROM \"complex_idx_test\" WHERE \"sys_from\" <= 0 + ) AS \"t3\" + WHERE (\"identification_number\" = 1 OR (\"identification_number\" = 100 OR \"identification_number\" = 1000)) + AND (\"product_code\" = \"222\" OR \"product_code\" = \"111\") + "; + + let first_sub_query = "SELECT * FROM \"complex_idx_test\" WHERE ((\"identification_number\" = 1 OR (\"identification_number\" = 100 OR \"identification_number\" = 1000)) AND (\"product_code\" = \"222\" OR \"product_code\" = \"111\")) AND (\"sys_from\" <= 0 AND \"sys_to\" >= 0)".to_string(); + let second_sub_query = "SELECT * FROM \"complex_idx_test\" WHERE ((\"identification_number\" = 1 OR (\"identification_number\" = 100 OR \"identification_number\" = 1000)) AND (\"product_code\" = \"222\" OR \"product_code\" = \"111\")) AND (\"sys_from\" <= 0)".to_string(); + let mut expected_result = Vec::new(); + expected_result.push(QueryResult { bucket_id: 2926, node_query: first_sub_query.clone() }); + expected_result.push(QueryResult { bucket_id: 22115, node_query: first_sub_query.clone() }); + expected_result.push(QueryResult { bucket_id: 6672, node_query: first_sub_query.clone() }); + expected_result.push(QueryResult { bucket_id: 4202, node_query: first_sub_query.clone() }); + expected_result.push(QueryResult { bucket_id: 23259, node_query: first_sub_query.clone() }); + expected_result.push(QueryResult { bucket_id: 6557, node_query: first_sub_query.clone() }); + expected_result.push(QueryResult { bucket_id: 2926, node_query: second_sub_query.clone() }); + expected_result.push(QueryResult { bucket_id: 22115, node_query: second_sub_query.clone() }); + expected_result.push(QueryResult { bucket_id: 6672, node_query: second_sub_query.clone() }); + expected_result.push(QueryResult { bucket_id: 4202, node_query: second_sub_query.clone() }); + expected_result.push(QueryResult { bucket_id: 23259, node_query: second_sub_query.clone() }); + expected_result.push(QueryResult { bucket_id: 6557, node_query: second_sub_query.clone() }); + + let s = ClusterSchema::from(TEST_SCHEMA.to_string()); + let q = UserQuery::new(test_query, s, 30000).unwrap(); + assert_eq!(q.transform().unwrap(), expected_result) + } + #[test] fn test_unsupported_query() { let s = ClusterSchema::from(TEST_SCHEMA.to_string());