diff --git a/doc/sql/query.ebnf b/doc/sql/query.ebnf index cb3afc84d40ef999fdb78ce087ec903000452747..1c6cd9990e011a19a84b0cc193488544afafcf15 100644 --- a/doc/sql/query.ebnf +++ b/doc/sql/query.ebnf @@ -16,6 +16,7 @@ select ::= 'SELECT' 'DISTINCT'? projection (',' projection)* 'FROM' scan ('HAVING' expression)? ('ORDER' 'BY' expression ('ASC' | 'DESC')? (',' expression ('ASC' | 'DESC')?)*)? (('UNION' 'ALL'? | 'EXCEPT' 'DISTINCT'?) select)? + (LIMIT (unsigned | "ALL" | "NULL"))? projection ::= (table '.')? '*' | expression (('AS')? name)? | aggregate scan ::= (table | '(' (query | values) ')') ('AS'? name)? expression ::= ('NOT'* ( diff --git a/sbroad-cartridge/test_app/test/integration/limit_test.lua b/sbroad-cartridge/test_app/test/integration/limit_test.lua new file mode 100644 index 0000000000000000000000000000000000000000..6a4643e25688ecf7163eb1241fbe2ede5cadc983 --- /dev/null +++ b/sbroad-cartridge/test_app/test/integration/limit_test.lua @@ -0,0 +1,83 @@ +local t = require('luatest') +local g = t.group('integration_api.limit') + +local helper = require('test.helper.cluster_no_replication') + +g.before_all( + function() + helper.start_test_cluster(helper.cluster_config) + local api = helper.cluster:server("api-1").net_box + + local r, err = api:call("sbroad.execute", { [[ + INSERT INTO "t"("id", "a") + VALUES (1, 1), (2, 2), (3, 3), (4, 4), (5, 5) + ]], }) + t.assert_equals(err, nil) + t.assert_equals(r, {row_count = 5}) + end +) + +g.after_all(function() + helper.stop_test_cluster() +end) + +g.test_limit = function() + local r, err + local api = helper.cluster:server("api-1").net_box + + -- select with limit + r, err = api:call("sbroad.execute", { [[ + SELECT "a" FROM "t" LIMIT 2 + ]], }) + t.assert_equals(err, nil) + t.assert_items_equals(r["metadata"], { {name = "a", type = "number"} }) + t.assert_equals(#r["rows"], 2) + + -- order by with limit + r, err = api:call("sbroad.execute", { [[ + SELECT "id" FROM "t" ORDER BY "id" LIMIT 2 + ]], }) + t.assert_equals(err, nil) + t.assert_items_equals(r["metadata"], { {name = "id", type = "integer"} }) + t.assert_items_equals(r["rows"], { {1}, {2} }) + + -- aggregate and group by with limit + r, err = api:call("sbroad.execute", { [[ + SELECT count(*) FROM "t" GROUP BY "id" LIMIT 3 + ]], }) + t.assert_equals(err, nil) + t.assert_items_equals(r["metadata"], { {name = "COL_1", type = "integer"} }) + t.assert_equals(#r["rows"], 3) + + -- cte with limit + r, err = api:call("sbroad.execute", { [[ + WITH cte (b) AS (SELECT "a" FROM "t" ORDER BY "a" LIMIT 2) + SELECT b FROM cte + UNION ALL + SELECT b FROM cte + ]], }) + t.assert_equals(err, nil) + t.assert_items_equals(r["metadata"], { {name = "B", type = "number"} }) + t.assert_items_equals(r["rows"], { {1}, {2}, {1}, {2} }) + + -- cte with limit + r, err = api:call("sbroad.execute", { [[ + WITH cte (b) AS (SELECT "a" FROM "t" ORDER BY "a" LIMIT 2) + SELECT b FROM cte + UNION ALL + SELECT b FROM cte + LIMIT 1 + ]], }) + t.assert_equals(err, nil) + t.assert_items_equals(r["metadata"], { {name = "B", type = "number"} }) + t.assert_equals(#r["rows"], 1) + + -- limit in a subquery + r, err = api:call("sbroad.execute", { [[ + SELECT "a" FROM (SELECT "a" FROM "t" LIMIT 1) + ]], }) + t.assert_equals(err, nil) + t.assert_items_equals(r["metadata"], { {name = "a", type = "number"} }) + t.assert_equals(#r["rows"], 1) +end + diff --git a/sbroad-core/src/backend/sql/ir.rs b/sbroad-core/src/backend/sql/ir.rs index f8bea9cf41169d8baf71a17c28c06c19ffe40daf..1bcd1478e8b2a2238a8787aef2e2eac9eaf806ff 100644 --- a/sbroad-core/src/backend/sql/ir.rs +++ b/sbroad-core/src/backend/sql/ir.rs @@ -300,6 +300,7 @@ impl ExecutionPlan { SyntaxData::Inline(content) => sql.push_str(content), SyntaxData::From => sql.push_str("FROM"), SyntaxData::Leading => sql.push_str("LEADING"), + SyntaxData::Limit(limit) => sql.push_str(&format_smolstr!("LIMIT {limit}")), SyntaxData::Both => sql.push_str("BOTH"), SyntaxData::Trailing => sql.push_str("TRAILING"), SyntaxData::Operator(s) => sql.push_str(s.as_str()), @@ -366,7 +367,8 @@ impl ExecutionPlan { Relational::ScanSubQuery { .. } | Relational::ScanCte { .. } | Relational::Motion { .. } - | Relational::ValuesRow { .. } => {} + | Relational::ValuesRow { .. } + | Relational::Limit { .. } => {} Relational::Selection { .. } => sql.push_str("WHERE"), Relational::Union { .. } => sql.push_str("UNION"), Relational::UnionAll { .. } => sql.push_str("UNION ALL"), diff --git a/sbroad-core/src/backend/sql/tree.rs b/sbroad-core/src/backend/sql/tree.rs index 64df3339aaf3d415a860277ee6f9f548bb2d20cc..bbc8284897cd8a0f26713290c08869bc640f4f63 100644 --- a/sbroad-core/src/backend/sql/tree.rs +++ b/sbroad-core/src/backend/sql/tree.rs @@ -57,6 +57,8 @@ pub enum SyntaxData { From, /// "leading" Leading, + /// "limit" + Limit(u64), /// "both" Both, /// "trailing" @@ -240,6 +242,14 @@ impl SyntaxNode { } } + fn new_limit(limit: u64) -> Self { + SyntaxNode { + data: SyntaxData::Limit(limit), + left: None, + right: Vec::new(), + } + } + fn new_both() -> Self { SyntaxNode { data: SyntaxData::Both, @@ -623,6 +633,7 @@ impl<'p> SyntaxPlan<'p> { Relational::Motion { .. } => self.add_motion(id), Relational::ValuesRow { .. } => self.add_values_row(id), Relational::Values { .. } => self.add_values(id), + Relational::Limit { .. } => self.add_limit(id), }, Node::Expression(expr) => match expr { Expression::ExprInParentheses { .. } => self.add_expr_in_parentheses(id), @@ -1063,6 +1074,22 @@ impl<'p> SyntaxPlan<'p> { arena.push_sn_plan(sn); } + fn add_limit(&mut self, id: usize) { + let (_, limit) = self.prologue_rel(id); + let Relational::Limit { limit, child, .. } = limit else { + panic!("expected LIMIT node"); + }; + let (limit, child) = (*limit, *child); + let child_sn_id = self.pop_from_stack(child); + let arena = &mut self.nodes; + let children: Vec<usize> = vec![ + child_sn_id, + arena.push_sn_non_plan(SyntaxNode::new_limit(limit)), + ]; + let sn = SyntaxNode::new_pointer(id, None, children); + arena.push_sn_plan(sn); + } + // Expression nodes. fn add_alias(&mut self, id: usize) { diff --git a/sbroad-core/src/executor/bucket.rs b/sbroad-core/src/executor/bucket.rs index 34b397ba1311d482c55a7a99ceb8ff4f9ff9424b..30db0c93d85e7af6010050ccad991b694e04741c 100644 --- a/sbroad-core/src/executor/bucket.rs +++ b/sbroad-core/src/executor/bucket.rs @@ -420,7 +420,8 @@ where | Relational::Having { output, .. } | Relational::OrderBy { output, .. } | Relational::ScanCte { output, .. } - | Relational::ScanSubQuery { output, .. } => { + | Relational::ScanSubQuery { output, .. } + | Relational::Limit { output, .. } => { let child_id = ir_plan.get_relational_child(node_id, 0)?; let child_rel = ir_plan.get_relation_node(child_id)?; let child_buckets = self diff --git a/sbroad-core/src/executor/ir.rs b/sbroad-core/src/executor/ir.rs index 719e6e4c10dcb645b316aa55b276912d6e76724e..073d51b59f3e55584cf25d51683373ed8d026f5e 100644 --- a/sbroad-core/src/executor/ir.rs +++ b/sbroad-core/src/executor/ir.rs @@ -321,7 +321,8 @@ impl ExecutionPlan { | Relational::Update { .. } | Relational::Values { .. } | Relational::Having { .. } - | Relational::ValuesRow { .. } => Ok(*top_id), + | Relational::ValuesRow { .. } + | Relational::Limit { .. } => Ok(*top_id), Relational::Motion { .. } | Relational::Insert { .. } | Relational::Delete { .. } => { Err(SbroadError::Invalid( Entity::Relational, @@ -584,7 +585,8 @@ impl ExecutionPlan { | Relational::ScanCte { .. } | Relational::Union { .. } | Relational::UnionAll { .. } - | Relational::Values { .. } => {} + | Relational::Values { .. } + | Relational::Limit { .. } => {} } for child_id in rel.mut_children() { diff --git a/sbroad-core/src/frontend/sql.rs b/sbroad-core/src/frontend/sql.rs index 266a56642e63e8c4b5ab2519adf2978863c17c02..8eca4ed724a48da7fd2621f1114ff7540d900af4 100644 --- a/sbroad-core/src/frontend/sql.rs +++ b/sbroad-core/src/frontend/sql.rs @@ -848,6 +848,7 @@ fn parse_select_full( ast: &AbstractSyntaxTree, node_id: usize, map: &mut Translation, + plan: &mut Plan, ) -> Result<(), SbroadError> { let node = ast.nodes.get_node(node_id)?; assert_eq!(node.rule, Rule::SelectFull); @@ -856,16 +857,50 @@ fn parse_select_full( let child_node = ast.nodes.get_node(*child_id)?; match child_node.rule { Rule::Cte => continue, + Rule::SelectStatement => { + top_id = Some(parse_select_statement(ast, *child_id, map, plan)?); + } + _ => unreachable!("Unexpected node: {child_node:?}"), + } + } + let top_id = top_id.expect("SelectFull must have at least one child"); + map.add(node_id, top_id); + Ok(()) +} + +fn parse_select_statement( + ast: &AbstractSyntaxTree, + node_id: usize, + map: &mut Translation, + plan: &mut Plan, +) -> Result<usize, SbroadError> { + let node = ast.nodes.get_node(node_id)?; + assert_eq!(node.rule, Rule::SelectStatement); + let mut top_id = None; + let mut limit = None; + for child_id in &node.children { + let child_node = ast.nodes.get_node(*child_id)?; + match child_node.rule { Rule::SelectWithOptionalContinuation => { let select_id = map.get(*child_id)?; top_id = Some(select_id); } + Rule::Limit => { + let child_node = ast.nodes.get_node(child_node.children[0])?; + match child_node.rule { + Rule::Unsigned => limit = Some(parse_unsigned(child_node)?), + Rule::LimitAll => (), // LIMIT ALL is the same as omitting the LIMIT clause + _ => unreachable!("Unexpected limit child: {child_node:?}"), + } + } _ => unreachable!("Unexpected node: {child_node:?}"), } } - let top_id = top_id.expect("Select must contain at least one child"); - map.add(node_id, top_id); - Ok(()) + let top_id = top_id.expect("SelectStatement must have at least one child"); + if let Some(limit) = limit { + return plan.add_limit(top_id, limit); + } + Ok(top_id) } fn parse_scan_cte_or_table<M>( @@ -933,10 +968,14 @@ fn parse_cte( let column_name = parse_normalized_identifier(ast, *child_id)?; columns.push(column_name); } - Rule::SelectWithOptionalContinuation | Rule::Values => { + Rule::Values => { let select_id = map.get(*child_id)?; top_id = Some(select_id); } + Rule::SelectStatement => { + let select_id = parse_select_statement(ast, *child_id, map, plan)?; + top_id = Some(select_id); + } _ => unreachable!("Unexpected node: {child_node:?}"), } } @@ -1147,6 +1186,25 @@ fn parse_trim<M: Metadata>( Ok(trim) } +fn parse_unsigned(ast_node: &ParseNode) -> Result<u64, SbroadError> { + assert!(matches!(ast_node.rule, Rule::Unsigned)); + if let Some(str_value) = ast_node.value.as_ref() { + str_value.parse::<u64>().map_err(|_| { + SbroadError::Invalid( + Entity::Query, + Some(format_smolstr!( + "option value is not unsigned integer: {str_value}" + )), + ) + }) + } else { + Err(SbroadError::Invalid( + AST, + Some("Unsigned node has value".into()), + )) + } +} + /// Common logic for `SqlVdbeMaxSteps` and `VTableMaxRows` parsing. fn parse_option<M: Metadata>( ast: &AbstractSyntaxTree, @@ -1168,23 +1226,7 @@ fn parse_option<M: Metadata>( OptionParamValue::Parameter { plan_id } } Rule::Unsigned => { - let v = { - if let Some(str_value) = ast_node.value.as_ref() { - str_value.parse::<u64>().map_err(|_| { - SbroadError::Invalid( - Entity::Query, - Some(format_smolstr!( - "option value is not unsigned integer: {str_value}" - )), - ) - })? - } else { - return Err(SbroadError::Invalid( - AST, - Some("Unsigned node has value".into()), - )); - } - }; + let v = parse_unsigned(ast_node)?; OptionParamValue::Value { val: Value::Unsigned(v), } @@ -2908,7 +2950,7 @@ impl AbstractSyntaxTree { map.add(id, select_plan_node_id); } Rule::SelectFull => { - parse_select_full(self, id, &mut map)?; + parse_select_full(self, id, &mut map, &mut plan)?; } Rule::Projection => { let (rel_child_id, other_children) = node diff --git a/sbroad-core/src/frontend/sql/ir.rs b/sbroad-core/src/frontend/sql/ir.rs index 33b9e01bb59c6c1ebd36dfcaa43e4e4854f5acee..284e966c0485b645e5a933f189c913a4a11e6c90 100644 --- a/sbroad-core/src/frontend/sql/ir.rs +++ b/sbroad-core/src/frontend/sql/ir.rs @@ -606,6 +606,11 @@ impl SubtreeCloner { left: _, right: _, output: _, + } + | Relational::Limit { + limit: _, + child: _, + output: _, } => {} Relational::Having { children: _, diff --git a/sbroad-core/src/frontend/sql/ir/tests.rs b/sbroad-core/src/frontend/sql/ir/tests.rs index 7d752c31bbec6f38b89373ee3b1891bb7f3954bb..14a19919e49b3b83a32963113fc7736e5b3587f6 100644 --- a/sbroad-core/src/frontend/sql/ir/tests.rs +++ b/sbroad-core/src/frontend/sql/ir/tests.rs @@ -3744,6 +3744,8 @@ mod insert; #[cfg(test)] mod join; #[cfg(test)] +mod limit; +#[cfg(test)] mod params; #[cfg(test)] mod single; diff --git a/sbroad-core/src/frontend/sql/ir/tests/cte.rs b/sbroad-core/src/frontend/sql/ir/tests/cte.rs index ca97c9bb27eed1d224dd32e9a9d8fa855b96e028..2d612e183728b6d4b1eb27a65d4fd98414880591 100644 --- a/sbroad-core/src/frontend/sql/ir/tests/cte.rs +++ b/sbroad-core/src/frontend/sql/ir/tests/cte.rs @@ -121,12 +121,14 @@ fn reuse_union_in_cte() { projection ("CTE"."A"::string -> "A") scan cte "CTE"($0) subquery $0: -motion [policy: full] - union - projection ("test_space"."FIRST_NAME"::string -> "FIRST_NAME") - scan "test_space" - projection ("test_space"."FIRST_NAME"::string -> "FIRST_NAME") - scan "test_space" +projection ("CTE"."FIRST_NAME"::string -> "A") + scan "CTE" + motion [policy: full] + union + projection ("test_space"."FIRST_NAME"::string -> "FIRST_NAME") + scan "test_space" + projection ("test_space"."FIRST_NAME"::string -> "FIRST_NAME") + scan "test_space" execution options: sql_vdbe_max_steps = 45000 vtable_max_rows = 5000 diff --git a/sbroad-core/src/frontend/sql/ir/tests/limit.rs b/sbroad-core/src/frontend/sql/ir/tests/limit.rs new file mode 100644 index 0000000000000000000000000000000000000000..e3a8589ecb065d1a5a55800360ddcfd67721a4bd --- /dev/null +++ b/sbroad-core/src/frontend/sql/ir/tests/limit.rs @@ -0,0 +1,191 @@ +use crate::ir::transformation::helpers::sql_to_optimized_ir; +use pretty_assertions::assert_eq; + +#[test] +fn select() { + let sql = r#"SELECT "id" FROM "test_space" LIMIT 100"#; + let plan = sql_to_optimized_ir(sql, vec![]); + + let expected_explain = String::from( + r#"limit 100 + motion [policy: full] + limit 100 + projection ("test_space"."id"::unsigned -> "id") + scan "test_space" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + +#[test] +fn union_all() { + let sql = r#" + select "product_code" from "hash_testing" + union all + select "e" from "t2" + limit 100 + "#; + let plan = sql_to_optimized_ir(sql, vec![]); + + let expected_explain = String::from( + r#"limit 100 + motion [policy: full] + limit 100 + union all + projection ("hash_testing"."product_code"::string -> "product_code") + scan "hash_testing" + projection ("t2"."e"::unsigned -> "e") + scan "t2" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + +#[test] +fn aggretage() { + let input = r#"SELECT min("b"), min(distinct "b") FROM "t" LIMIT 1"#; + + let plan = sql_to_optimized_ir(input, vec![]); + + let expected_explain = String::from( + r#"limit 1 + projection (min(("min_13"::unsigned))::scalar -> "COL_1", min(distinct ("column_15"::unsigned))::scalar -> "COL_2") + motion [policy: full] + scan + projection ("t"."b"::unsigned -> "column_15", min(("t"."b"::unsigned))::scalar -> "min_13") + group by ("t"."b"::unsigned) output: ("t"."a"::unsigned -> "a", "t"."b"::unsigned -> "b", "t"."c"::unsigned -> "c", "t"."d"::unsigned -> "d", "t"."bucket_id"::unsigned -> "bucket_id") + scan "t" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + +#[test] +fn group_by() { + let input = r#"SELECT cOuNt(*), "b" FROM "t" group by "b" limit 555"#; + + let plan = sql_to_optimized_ir(input, vec![]); + + let expected_explain = String::from( + r#"limit 555 + motion [policy: full] + limit 555 + projection (sum(("count_26"::integer))::decimal -> "COL_1", "column_12"::unsigned -> "b") + group by ("column_12"::unsigned) output: ("column_12"::unsigned -> "column_12", "count_26"::integer -> "count_26") + motion [policy: segment([ref("column_12")])] + scan + projection ("t"."b"::unsigned -> "column_12", count((*::integer))::integer -> "count_26") + group by ("t"."b"::unsigned) output: ("t"."a"::unsigned -> "a", "t"."b"::unsigned -> "b", "t"."c"::unsigned -> "c", "t"."d"::unsigned -> "d", "t"."bucket_id"::unsigned -> "bucket_id") + scan "t" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + +#[test] +fn single_limit() { + let sql = r#"SELECT * FROM (SELECT "id" FROM "test_space" LIMIT 1) LIMIT 1"#; + let plan = sql_to_optimized_ir(sql, vec![]); + + let expected_explain = String::from( + r#"limit 1 + projection ("id"::unsigned -> "id") + scan + limit 1 + motion [policy: full] + limit 1 + projection ("test_space"."id"::unsigned -> "id") + scan "test_space" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + +#[test] +fn join() { + let input = r#"SELECT * FROM "t1" LEFT JOIN "t2" ON "t1"."a" = "t2"."e" + JOIN "t3" ON "t1"."a" = "t3"."a" JOIN "t4" ON "t2"."f" = "t4"."c" + LIMIT 128 +"#; + let plan = sql_to_optimized_ir(input, vec![]); + + let expected_explain = String::from( + r#"limit 128 + motion [policy: full] + limit 128 + projection ("t1"."a"::string -> "a", "t1"."b"::integer -> "b", "t2"."e"::unsigned -> "e", "t2"."f"::unsigned -> "f", "t2"."g"::unsigned -> "g", "t2"."h"::unsigned -> "h", "t3"."a"::string -> "a", "t3"."b"::integer -> "b", "t4"."c"::string -> "c", "t4"."d"::integer -> "d") + join on ROW("t2"."f"::unsigned) = ROW("t4"."c"::string) + join on ROW("t1"."a"::string) = ROW("t3"."a"::string) + left join on ROW("t1"."a"::string) = ROW("t2"."e"::unsigned) + scan "t1" + projection ("t1"."a"::string -> "a", "t1"."b"::integer -> "b") + scan "t1" + motion [policy: full] + scan "t2" + projection ("t2"."e"::unsigned -> "e", "t2"."f"::unsigned -> "f", "t2"."g"::unsigned -> "g", "t2"."h"::unsigned -> "h") + scan "t2" + motion [policy: full] + scan "t3" + projection ("t3"."a"::string -> "a", "t3"."b"::integer -> "b") + scan "t3" + motion [policy: full] + scan "t4" + projection ("t4"."c"::string -> "c", "t4"."d"::integer -> "d") + scan "t4" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + +#[test] +fn limit_all() { + let sql = r#"SELECT "id" FROM "test_space" LIMIT ALL"#; + let plan = sql_to_optimized_ir(sql, vec![]); + + let expected_explain = String::from( + r#"projection ("test_space"."id"::unsigned -> "id") + scan "test_space" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + +#[test] +fn limit_null() { + let sql = r#"SELECT "id" FROM "test_space" LIMIT NULL"#; + let plan = sql_to_optimized_ir(sql, vec![]); + + let expected_explain = String::from( + r#"projection ("test_space"."id"::unsigned -> "id") + scan "test_space" +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} diff --git a/sbroad-core/src/frontend/sql/query.pest b/sbroad-core/src/frontend/sql/query.pest index 151a4bd00f5e492f3ab7073313d7e9d4eaf0a6d9..d360eb74511cc04c8496a55ecd246a5e5d21a3ea 100644 --- a/sbroad-core/src/frontend/sql/query.pest +++ b/sbroad-core/src/frontend/sql/query.pest @@ -160,15 +160,20 @@ ExplainQuery = _{ Explain } Query = { (SelectFull | Values | Insert | Update | Delete) ~ DqlOption? } SelectFull = { (^"with" ~ Cte ~ ("," ~ Cte)*)? - ~ SelectWithOptionalContinuation + ~ SelectStatement } + SelectStatement = { + SelectWithOptionalContinuation ~ Limit? + } + Limit = { ^"limit" ~ (Unsigned | LimitAll) } + LimitAll = { ^"all" | Null } SelectOp = _{ UnionAllOp | ExceptOp | UnionOp } UnionOp = { ^"union" } ExceptOp = { (^"except" ~ ^"distinct") | ^"except" } UnionAllOp = { ^"union" ~ ^"all" } SelectWithOptionalContinuation = { Select ~ (SelectOp ~ Select)* } Cte = { Identifier ~ ("(" ~ CteColumn ~ ("," ~ CteColumn)* ~ ")")? ~ ^"as" - ~ "(" ~ (SelectWithOptionalContinuation | Values) ~ ")" } + ~ "(" ~ (SelectStatement | Values) ~ ")" } CteColumn = @{ Identifier } Select = { ^"select" ~ Projection ~ ^"from" ~ Scan ~ @@ -246,6 +251,7 @@ Identifier = @{ DelimitedIdentifier | RegularIdentifier } | ^"table" | ^"text" | ^"then" | ^"to" | ^"true" | ^"union" | ^"unsigned" | ^"using" | ^"uuid" | ^"values" | ^"varchar" | ^"when" | ^"where" | ^"with" + | ^"limit" } Expr = { ExprAtomValue ~ (ExprInfixOp ~ ExprAtomValue)* } diff --git a/sbroad-core/src/ir/explain.rs b/sbroad-core/src/ir/explain.rs index da65e2d744421bb9b3c2b55f742f9f1306132160..fb985e4dfaab4ed9d395047c3b2a8c46c7e91399 100644 --- a/sbroad-core/src/ir/explain.rs +++ b/sbroad-core/src/ir/explain.rs @@ -897,6 +897,7 @@ enum ExplainNode { SubQuery(SubQuery), Motion(Motion), Cte(SmolStr, Ref), + Limit(u64), } impl Display for ExplainNode { @@ -923,6 +924,7 @@ impl Display for ExplainNode { ExplainNode::Update(u) => u.to_smolstr(), ExplainNode::SubQuery(s) => s.to_smolstr(), ExplainNode::Motion(m) => m.to_smolstr(), + ExplainNode::Limit(l) => format_smolstr!("limit {l}"), }; write!(f, "{s}") @@ -1344,6 +1346,17 @@ impl FullExplain { Some(ExplainNode::Delete(relation.to_smolstr())) } + Relational::Limit { limit, .. } => { + let child = stack.pop().ok_or_else(|| { + SbroadError::UnexpectedNumberOfValues( + "Limit node must have exactly one child".into(), + ) + })?; + + current_node.children.push(child); + + Some(ExplainNode::Limit(*limit)) + } }; stack.push(current_node); } diff --git a/sbroad-core/src/ir/helpers.rs b/sbroad-core/src/ir/helpers.rs index f553dd4b3235f60d06df77a17fdcfbc469b86b20..dbb5f1a82eac664b088e50a596655565286fed7d 100644 --- a/sbroad-core/src/ir/helpers.rs +++ b/sbroad-core/src/ir/helpers.rs @@ -341,6 +341,7 @@ impl Plan { Relational::Insert { .. } => writeln!(buf, "Insert")?, Relational::Intersect { .. } => writeln!(buf, "Intersect")?, Relational::Except { .. } => writeln!(buf, "Except")?, + Relational::Limit { limit, .. } => writeln!(buf, "Limit {limit}")?, } // Print children. match relation { @@ -359,7 +360,8 @@ impl Plan { | Relational::Update { .. } | Relational::Having { .. } | Relational::GroupBy { .. } - | Relational::ValuesRow { .. }) => { + | Relational::ValuesRow { .. } + | Relational::Limit { .. }) => { writeln_with_tabulation(buf, tabulation_number + 1, "Children:")?; for child in &node.children() { writeln_with_tabulation( @@ -401,7 +403,8 @@ impl Plan { | Relational::Union { output, .. } | Relational::UnionAll { output, .. } | Relational::Update { output, .. } - | Relational::ValuesRow { output, .. } => { + | Relational::ValuesRow { output, .. } + | Relational::Limit { output, .. } => { writeln_with_tabulation( buf, tabulation_number + 1, diff --git a/sbroad-core/src/ir/operator.rs b/sbroad-core/src/ir/operator.rs index d7aa8c89f54e745ac4846ec7773d81a3d49f8d6c..b841733d3e6974ea9b6e24fda0824edbc78b50c8 100644 --- a/sbroad-core/src/ir/operator.rs +++ b/sbroad-core/src/ir/operator.rs @@ -501,6 +501,16 @@ pub enum Relational { /// should be empty. children: Vec<usize>, }, + Limit { + /// Output tuple. + output: usize, + // The limit value constant that comes after LIMIT keyword. + limit: u64, + /// Select statement that is being limited. + /// Note that it can be a complex statement, like SELECT .. UNION ALL SELECT .. LIMIT 100, + /// in that case limit is applied to the result of union. + child: usize, + }, } #[allow(dead_code)] @@ -527,7 +537,8 @@ impl Relational { | Relational::Union { output, .. } | Relational::UnionAll { output, .. } | Relational::Values { output, .. } - | Relational::ValuesRow { output, .. } => *output, + | Relational::ValuesRow { output, .. } + | Relational::Limit { output, .. } => *output, } } @@ -553,7 +564,8 @@ impl Relational { | Relational::Union { output, .. } | Relational::UnionAll { output, .. } | Relational::Values { output, .. } - | Relational::ValuesRow { output, .. } => output, + | Relational::ValuesRow { output, .. } + | Relational::Limit { output, .. } => output, } } @@ -561,9 +573,9 @@ impl Relational { #[must_use] pub fn children(&self) -> Children<'_> { match self { - Relational::OrderBy { child, .. } | Relational::ScanCte { child, .. } => { - Children::Single(child) - } + Relational::OrderBy { child, .. } + | Relational::ScanCte { child, .. } + | Relational::Limit { child, .. } => Children::Single(child), Relational::Except { left, right, .. } | Relational::Intersect { left, right, .. } | Relational::UnionAll { left, right, .. } @@ -589,9 +601,9 @@ impl Relational { pub fn mut_children(&mut self) -> MutChildren<'_> { // return MutChildren { node: self }; match self { - Relational::OrderBy { child, .. } | Relational::ScanCte { child, .. } => { - MutChildren::Single(child) - } + Relational::OrderBy { child, .. } + | Relational::ScanCte { child, .. } + | Relational::Limit { child, .. } => MutChildren::Single(child), Relational::Except { left, right, .. } | Relational::Intersect { left, right, .. } | Relational::UnionAll { left, right, .. } @@ -751,6 +763,13 @@ impl Relational { // It is safe to unwrap here, because the length is already checked above. *child = children[0]; } + Relational::Limit { ref mut child, .. } => { + if children.len() != 1 { + unreachable!("LIMIT may have only a single relational child"); + } + // It is safe to unwrap here, because the length is already checked above. + *child = children[0]; + } Relational::ScanRelation { .. } => { assert!(children.is_empty(), "scan must have no children!"); } @@ -826,7 +845,8 @@ impl Relational { | Relational::Union { .. } | Relational::UnionAll { .. } | Relational::Values { .. } - | Relational::ValuesRow { .. } => Ok(None), + | Relational::ValuesRow { .. } + | Relational::Limit { .. } => Ok(None), } } @@ -852,6 +872,7 @@ impl Relational { Relational::UnionAll { .. } => "UnionAll", Relational::Values { .. } => "Values", Relational::ValuesRow { .. } => "ValuesRow", + Relational::Limit { .. } => "Limit", } } @@ -1734,15 +1755,16 @@ impl Plan { if !columns.is_empty() { let mut child_output_id = child_node.output(); - // If the child node is VALUES, we need to wrap it with a subquery - // to change names in projection. - if matches!(child_node, Relational::Values { .. }) { + // Child must be a projection, but sometimes we need to get our hand dirty to maintain + // this invariant. For instance, child can be VALUES, LIMIT or UNION. In such cases + // we wrap the child with a subquery and change names in the subquery's projection. + if !matches!(child_node, Relational::Projection { .. }) { let sq_id = self .add_sub_query(child_id, Some(&alias)) - .expect("add subquery for values"); + .expect("add subquery in cte"); child_id = self .add_proj(sq_id, &[], false, false) - .expect("add projection for values"); + .expect("add projection in cte"); child_output_id = self .get_relational_output(child_id) .expect("projection has an output tuple"); @@ -1832,6 +1854,23 @@ impl Plan { Ok(union_id) } + /// Adds a limit node to the top of select node. + /// + /// # Errors + /// - Row node is not of a row type + pub fn add_limit(&mut self, select: usize, limit: u64) -> Result<usize, SbroadError> { + let output = self.add_row_for_output(select, &[], true)?; + let limit = Relational::Limit { + output, + limit, + child: select, + }; + + let limit_id = self.add_relational(limit)?; + self.replace_parent_in_subtree(output, None, Some(limit_id))?; + Ok(limit_id) + } + /// Adds a values row node. /// /// # Errors diff --git a/sbroad-core/src/ir/transformation/redistribution.rs b/sbroad-core/src/ir/transformation/redistribution.rs index 6a233a86e559d23e44d2bdc63794200144fdeba5..afd9e0f486dfe87d47ec6073469c596c180db351 100644 --- a/sbroad-core/src/ir/transformation/redistribution.rs +++ b/sbroad-core/src/ir/transformation/redistribution.rs @@ -2169,6 +2169,35 @@ impl Plan { | Relational::ValuesRow { output, .. } => { self.set_distribution(output)?; } + Relational::Limit { output, limit, .. } => { + let rel_child_id = self.get_relational_child(id, 0)?; + let child_dist = + self.get_distribution(self.get_relational_output(rel_child_id)?)?; + + match child_dist { + Distribution::Single | Distribution::Global => { + // All rows on a single node, no motion needed. + self.set_dist(output, child_dist.clone())?; + } + Distribution::Any | Distribution::Segment { .. } => { + // Rows are distributed, so motion needed with full policy to + // bring them on a single node. + let child_dist = child_dist.clone(); + // We don't need more than limit rows, so we can add a limit for the + // queries sent during the map stage. + let limit_id = self.add_limit(id, limit)?; + self.set_dist( + self.get_relational_output(limit_id)?, + Distribution::Single, + )?; + old_new.insert(id, limit_id); + let mut strategy = Strategy::new(limit_id); + strategy.add_child(id, MotionPolicy::Full, Program::default()); + self.create_motion_nodes(strategy)?; + self.set_dist(output, child_dist)?; + } + } + } Relational::OrderBy { output, .. } => { let rel_child_id = self.get_relational_child(id, 0)?; diff --git a/sbroad-core/src/ir/tree/relation.rs b/sbroad-core/src/ir/tree/relation.rs index 0bb709fb5a9fa1f3fc4a6e4831bb8f48ddd4e825..40733e65efb733ed222d7314e06d7b9c1264e9ed 100644 --- a/sbroad-core/src/ir/tree/relation.rs +++ b/sbroad-core/src/ir/tree/relation.rs @@ -98,7 +98,9 @@ fn relational_next<'nodes>( None } Some(Node::Relational( - Relational::OrderBy { child, .. } | Relational::ScanCte { child, .. }, + Relational::OrderBy { child, .. } + | Relational::ScanCte { child, .. } + | Relational::Limit { child, .. }, )) => { let step = *iter.get_child().borrow(); if step == 0 { diff --git a/sbroad-core/src/ir/tree/subtree.rs b/sbroad-core/src/ir/tree/subtree.rs index 9c69a32e1bf3888fdff3f599ad4877266c36bca2..ce331404a2459748dec08cc28e287d5e0df2205b 100644 --- a/sbroad-core/src/ir/tree/subtree.rs +++ b/sbroad-core/src/ir/tree/subtree.rs @@ -319,7 +319,8 @@ fn subtree_next<'plan>( } None } - Relational::ScanCte { child, output, .. } => { + Relational::ScanCte { child, output, .. } + | Relational::Limit { child, output, .. } => { let step = *iter.get_child().borrow(); if step == 0 { *iter.get_child().borrow_mut() += 1;