diff --git a/doc/sql/query.ebnf b/doc/sql/query.ebnf index 33eb0cc2683fa5e8904d2d69401736dfd3cf626e..6e5995da7fb97e49c22ea6e266e55615f8079e18 100644 --- a/doc/sql/query.ebnf +++ b/doc/sql/query.ebnf @@ -128,7 +128,9 @@ alter_system ::= 'ALTER' 'SYSTEM' ('FOR' ('ALL' 'TIERS' | 'TIER' tier))? ('OPTION' '(' ('TIMEOUT' '=' double)')')? alter_procedure ::= 'ALTER' 'PROCEDURE' procedure ('(' type (',' type)* ')')? - 'RENAME' 'TO' procedure ('OPTION' '(' ('TIMEOUT' '=' double)')')? + 'RENAME' 'TO' procedure + ('WAIT' 'APPLIED' ('GLOBALLY' | 'LOCALLY'))? + ('OPTION' '(' ('TIMEOUT' '=' double)')')? create_index ::= 'CREATE' 'UNIQUE'? 'INDEX' ('IF' 'NOT' 'EXISTS')? index 'ON' table ('USING' ('TREE' | 'HASH' | 'RTREE' | 'BITSET'))? '(' column (',' column)* ')' ('WITH' '(' @@ -155,12 +157,16 @@ create_index ::= 'CREATE' 'UNIQUE'? 'INDEX' ('IF' 'NOT' 'EXISTS')? index 'ON' ta | ('HINT' '=' ('TRUE' | 'FALSE')) ) )* - ')')? ('OPTION' '(' ('TIMEOUT' '=' double)')')? + ')')? + ('WAIT' 'APPLIED' ('GLOBALLY' | 'LOCALLY'))? + ('OPTION' '(' ('TIMEOUT' '=' double)')')? create_procedure ::= 'CREATE' 'PROCEDURE' ('IF' 'NOT' 'EXISTS')? procedure '(' type (',' type)* ')' ('LANGUAGE' 'SQL')? ( ('AS' '$$' (insert | update | delete) '$$') | ('BEGIN' 'ATOMIC' (insert | update | delete) 'END') - ) ('OPTION' '(' ('TIMEOUT' '=' double)')')? + ) + ('WAIT' 'APPLIED' ('GLOBALLY' | 'LOCALLY'))? + ('OPTION' '(' ('TIMEOUT' '=' double)')')? create_role ::= 'CREATE' 'ROLE' ('IF' 'NOT' 'EXISTS')? role create_table ::= 'CREATE' 'TABLE' ('IF' 'NOT' 'EXISTS')? table '(' @@ -169,6 +175,7 @@ create_table ::= 'CREATE' 'TABLE' ('IF' 'NOT' 'EXISTS')? table ')' ('USING' ('MEMTX' | 'VINYL'))? (('DISTRIBUTED' (('BY' '(' column (',' column)* ')' ('IN' 'TIER' tier)?) | 'GLOBALLY'))?)? + ('WAIT' 'APPLIED' ('GLOBALLY' | 'LOCALLY'))? ('OPTION' '(' ('TIMEOUT' '=' double)')')? create_user ::= 'CREATE' 'USER' ('IF' 'NOT' 'EXISTS')? user 'WITH'? 'PASSWORD' "'" password "'" ('USING' ('CHAP-SHA1' | 'LDAP' | 'MD5'))? @@ -179,9 +186,15 @@ alter_user ::= 'ALTER' 'USER' user | 'PASSWORD' "'" password "'" ('USING' ('CHAP-SHA1' | 'LDAP' | 'MD5'))? | 'RENAME' 'TO' user ) -drop_index ::= 'DROP' 'INDEX' ('IF' 'EXISTS')? index ('OPTION' '(' ('TIMEOUT' '=' double)')')? -drop_procedure ::= 'DROP' 'PROCEDURE' ('IF' 'EXISTS')? procedure ('(' type (',' type)* ')')? ('OPTION' '(' ('TIMEOUT' '=' double)')')? -drop_table ::= 'DROP' 'TABLE' ('IF' 'EXISTS')? table ('OPTION' '(' ('TIMEOUT' '=' double)')')? +drop_index ::= 'DROP' 'INDEX' ('IF' 'EXISTS')? index + ('WAIT' 'APPLIED' ('GLOBALLY' | 'LOCALLY'))? + ('OPTION' '(' ('TIMEOUT' '=' double)')')? +drop_procedure ::= 'DROP' 'PROCEDURE' ('IF' 'EXISTS')? procedure ('(' type (',' type)* ')')? + ('WAIT' 'APPLIED' ('GLOBALLY' | 'LOCALLY'))? + ('OPTION' '(' ('TIMEOUT' '=' double)')')? +drop_table ::= 'DROP' 'TABLE' ('IF' 'EXISTS')? table + ('WAIT' 'APPLIED' ('GLOBALLY' | 'LOCALLY'))? + ('OPTION' '(' ('TIMEOUT' '=' double)')')? drop_role ::= 'DROP' 'ROLE' ('IF' 'EXISTS')? role drop_user ::= 'DROP' 'USER' ('IF' 'EXISTS')? user create_plugin ::= 'CREATE' 'PLUGIN' ('IF' 'NOT' 'EXISTS')? plugin version diff --git a/sbroad-core/src/frontend/sql.rs b/sbroad-core/src/frontend/sql.rs index 051a0ad3e516aaf139afb9b9eb4c45834761ae9d..524486d5d7114df70cb72f97a4b1377d330f3c5d 100644 --- a/sbroad-core/src/frontend/sql.rs +++ b/sbroad-core/src/frontend/sql.rs @@ -75,6 +75,8 @@ const DEFAULT_AUTH_METHOD: &str = "md5"; const DEFAULT_IF_EXISTS: bool = false; const DEFAULT_IF_NOT_EXISTS: bool = false; +const DEFAULT_WAIT_APPLIED_GLOBALLY: bool = true; + fn get_default_timeout() -> Decimal { Decimal::from_str(&format!("{DEFAULT_TIMEOUT_F64}")).expect("default timeout casting failed") } @@ -232,6 +234,7 @@ fn parse_rename_proc( let mut new_name = SmolStr::default(); let mut params: Option<Vec<ParamDef>> = None; let mut timeout = get_default_timeout(); + let mut wait_applied_globally = DEFAULT_WAIT_APPLIED_GLOBALLY; for child_id in &node.children { let child_node = ast.nodes.get_node(*child_id)?; match child_node.rule { @@ -247,6 +250,8 @@ fn parse_rename_proc( Rule::Timeout => { timeout = get_timeout(ast, *child_id)?; } + Rule::WaitAppliedGlobally => wait_applied_globally = true, + Rule::WaitAppliedLocally => wait_applied_globally = false, _ => panic!("Unexpected node: {child_node:?}"), } } @@ -255,6 +260,7 @@ fn parse_rename_proc( new_name, params, timeout, + wait_applied_globally, }) } @@ -268,6 +274,7 @@ fn parse_create_proc( let mut body = SmolStr::default(); let mut if_not_exists = DEFAULT_IF_NOT_EXISTS; let mut timeout = get_default_timeout(); + let mut wait_applied_globally = DEFAULT_WAIT_APPLIED_GLOBALLY; for child_id in &node.children { let child_node = ast.nodes.get_node(*child_id)?; match child_node.rule { @@ -290,6 +297,8 @@ fn parse_create_proc( Rule::Timeout => { timeout = get_timeout(ast, *child_id)?; } + Rule::WaitAppliedGlobally => wait_applied_globally = true, + Rule::WaitAppliedLocally => wait_applied_globally = false, _ => unreachable!("Unexpected node: {child_node:?}"), } } @@ -302,6 +311,7 @@ fn parse_create_proc( body, if_not_exists, timeout, + wait_applied_globally, }; Ok(create_proc) } @@ -417,6 +427,7 @@ fn parse_drop_proc(ast: &AbstractSyntaxTree, node: &ParseNode) -> Result<DropPro let mut params = None; let mut timeout = get_default_timeout(); let mut if_exists = DEFAULT_IF_EXISTS; + let mut wait_applied_globally = DEFAULT_WAIT_APPLIED_GLOBALLY; for child_id in &node.children { let child_node = ast.nodes.get_node(*child_id)?; match child_node.rule { @@ -425,8 +436,10 @@ fn parse_drop_proc(ast: &AbstractSyntaxTree, node: &ParseNode) -> Result<DropPro name = Some(proc_name); params = proc_params; } - Rule::TimeoutOption => timeout = get_timeout(ast, *child_id)?, + Rule::Timeout => timeout = get_timeout(ast, *child_id)?, Rule::IfExists => if_exists = true, + Rule::WaitAppliedGlobally => wait_applied_globally = true, + Rule::WaitAppliedLocally => wait_applied_globally = false, child_node => panic!("unexpected node {child_node:?}"), } } @@ -437,6 +450,7 @@ fn parse_drop_proc(ast: &AbstractSyntaxTree, node: &ParseNode) -> Result<DropPro params, if_exists, timeout, + wait_applied_globally, }) } @@ -461,6 +475,7 @@ fn parse_create_index( let mut hint = None; let mut if_not_exists = DEFAULT_IF_NOT_EXISTS; let mut timeout = get_default_timeout(); + let mut wait_applied_globally = DEFAULT_WAIT_APPLIED_GLOBALLY; let first_child = |node: &ParseNode| -> &ParseNode { let child_id = node.children.first().expect("Expected to see first child"); @@ -559,6 +574,8 @@ fn parse_create_index( } } Rule::Timeout => timeout = get_timeout(ast, *child_id)?, + Rule::WaitAppliedGlobally => wait_applied_globally = true, + Rule::WaitAppliedLocally => wait_applied_globally = false, _ => panic!("Unexpected index rule: {child_node:?}"), } } @@ -578,6 +595,7 @@ fn parse_create_index( distance, hint, timeout, + wait_applied_globally, }; Ok(index) } @@ -587,12 +605,15 @@ fn parse_drop_index(ast: &AbstractSyntaxTree, node: &ParseNode) -> Result<DropIn let mut name = SmolStr::default(); let mut timeout = get_default_timeout(); let mut if_exists = DEFAULT_IF_EXISTS; + let mut wait_applied_globally = DEFAULT_WAIT_APPLIED_GLOBALLY; for child_id in &node.children { let child_node = ast.nodes.get_node(*child_id)?; match child_node.rule { Rule::Identifier => name = parse_identifier(ast, *child_id)?, Rule::Timeout => timeout = get_timeout(ast, *child_id)?, Rule::IfExists => if_exists = true, + Rule::WaitAppliedGlobally => wait_applied_globally = true, + Rule::WaitAppliedLocally => wait_applied_globally = false, _ => panic!("Unexpected drop index node: {child_node:?}"), } } @@ -600,6 +621,7 @@ fn parse_drop_index(ast: &AbstractSyntaxTree, node: &ParseNode) -> Result<DropIn name, if_exists, timeout, + wait_applied_globally, }) } @@ -625,6 +647,7 @@ fn parse_create_table( let mut tier = None; let mut is_global = false; let mut if_not_exists = DEFAULT_IF_NOT_EXISTS; + let mut wait_applied_globally = DEFAULT_WAIT_APPLIED_GLOBALLY; let nullable_primary_key_column_error = Err(SbroadError::Invalid( Entity::Column, @@ -871,6 +894,12 @@ fn parse_create_table( Rule::Timeout => { timeout = get_timeout(ast, *child_id)?; } + Rule::WaitAppliedGlobally => { + wait_applied_globally = true; + } + Rule::WaitAppliedLocally => { + wait_applied_globally = false; + } _ => panic!("Unexpected rule met under CreateTable."), } } @@ -904,6 +933,7 @@ fn parse_create_table( sharding_key, engine_type, if_not_exists, + wait_applied_globally, timeout, tier, }) @@ -913,6 +943,7 @@ fn parse_drop_table(ast: &AbstractSyntaxTree, node: &ParseNode) -> Result<DropTa let mut table_name = SmolStr::default(); let mut timeout = get_default_timeout(); let mut if_exists = DEFAULT_IF_EXISTS; + let mut wait_applied_globally = DEFAULT_WAIT_APPLIED_GLOBALLY; for child_id in &node.children { let child_node = ast.nodes.get_node(*child_id)?; match child_node.rule { @@ -925,6 +956,12 @@ fn parse_drop_table(ast: &AbstractSyntaxTree, node: &ParseNode) -> Result<DropTa Rule::Timeout => { timeout = get_timeout(ast, *child_id)?; } + Rule::WaitAppliedGlobally => { + wait_applied_globally = true; + } + Rule::WaitAppliedLocally => { + wait_applied_globally = false; + } _ => { return Err(SbroadError::Invalid( Entity::Node, @@ -939,6 +976,7 @@ fn parse_drop_table(ast: &AbstractSyntaxTree, node: &ParseNode) -> Result<DropTa Ok(DropTable { name: table_name, if_exists, + wait_applied_globally, timeout, }) } diff --git a/sbroad-core/src/frontend/sql/query.pest b/sbroad-core/src/frontend/sql/query.pest index d4883bdd6886c35c933495b302d56f845ab8d7ac..0bd3d46a397d8df1ca0cede2bc096223930ef394 100644 --- a/sbroad-core/src/frontend/sql/query.pest +++ b/sbroad-core/src/frontend/sql/query.pest @@ -78,10 +78,15 @@ ACL = _{ DropRole | DropUser | CreateRole | CreateUser | AlterUser | GrantPrivil DDL = _{ CreateTable | DropTable | CreateIndex | DropIndex | CreateProc | DropProc | RenameProc | SetParam | SetTransaction | AlterSystem } + + WaitApplied = _{ WaitAppliedGlobally | WaitAppliedLocally } + WaitAppliedGlobally = { ^"wait" ~ ^"applied" ~ ^"globally" } + WaitAppliedLocally = { ^"wait" ~ ^"applied" ~ ^"locally" } + CreateTable = { ^"create" ~ ^"table" ~ IfNotExists? ~ NewTable ~ "(" ~ Columns ~ ("," ~ PrimaryKey)? ~ ")" ~ - Engine? ~ Distribution? ~ TimeoutOption? + Engine? ~ Distribution? ~ WaitApplied? ~ TimeoutOption? } NewTable = @{Table} Columns = { ColumnDef ~ ("," ~ ColumnDef)* } @@ -96,29 +101,28 @@ DDL = _{ CreateTable | DropTable | CreateIndex | DropIndex Global = { ^"globally" } Sharding = { ^"by" ~ "(" ~ Identifier ~ ("," ~ Identifier)* ~ ")" ~ Tier? } Tier = { ^"in" ~ ^"tier" ~ Identifier } - DropTable = { ^"drop" ~ ^"table" ~ IfExists? ~ Table ~ TimeoutOption? } - + DropTable = { ^"drop" ~ ^"table" ~ IfExists? ~ Table ~ WaitApplied? ~ TimeoutOption? } CreateProc = { ^"create" ~ ^"procedure" ~ IfNotExists? ~ Identifier ~ ProcParams ~ (^"language" ~ ProcLanguage)? ~ ((^"as" ~ "$$" ~ ProcBody ~ "$$") | (^"begin" ~ "atomic" ~ ProcBody ~ "end")) - ~ TimeoutOption? + ~ WaitApplied? ~ TimeoutOption? } ProcParams = { "(" ~ (ColumnDefType ~ ("," ~ ColumnDefType)*)? ~ ")" } ProcLanguage = { SQL } SQL = { ^"sql" } ProcBody = { (Insert | Update | Delete) } - DropProc = { ^"drop" ~ ^"procedure" ~ IfExists? ~ ProcWithOptionalParams ~ TimeoutOption? } + DropProc = { ^"drop" ~ ^"procedure" ~ IfExists? ~ ProcWithOptionalParams ~ WaitApplied? ~ TimeoutOption? } ProcWithOptionalParams = { Identifier ~ ProcParams? } - RenameProc = { ^"alter" ~ ^"procedure" ~ OldProc ~ ProcParams? ~ ^"rename" ~ ^"to" ~ NewProc ~ TimeoutOption? } + RenameProc = { ^"alter" ~ ^"procedure" ~ OldProc ~ ProcParams? ~ ^"rename" ~ ^"to" ~ NewProc ~ WaitApplied? ~ TimeoutOption? } OldProc = @{ Identifier } NewProc = @{ Identifier } CreateIndex = { ^"create" ~ Unique? ~ ^"index" ~ IfNotExists? ~ Identifier ~ ^"on" ~ Table - ~ IndexType? ~ "(" ~ Parts ~ ")" ~ IndexOptions? ~ TimeoutOption? + ~ IndexType? ~ "(" ~ Parts ~ ")" ~ IndexOptions? ~ WaitApplied? ~ TimeoutOption? } Unique = { ^"unique" } IndexType = { ^"using" ~ (Tree | Hash | RTree | BitSet) } @@ -140,8 +144,7 @@ DDL = _{ CreateTable | DropTable | CreateIndex | DropIndex Euclid = { ^"euclid" } Manhattan = { ^"manhattan" } Hint = { ^"hint" ~ "=" ~ (True | False) } - - DropIndex = { ^"drop" ~ ^"index" ~ IfExists? ~ Identifier ~ TimeoutOption? } + DropIndex = { ^"drop" ~ ^"index" ~ IfExists? ~ Identifier ~ WaitApplied? ~ TimeoutOption? } SetParam = { ^"set" ~ SetScope? ~ ConfParam } SetScope = { ScopeSession | ScopeLocal } diff --git a/sbroad-core/src/ir/node.rs b/sbroad-core/src/ir/node.rs index dbd7cb7cf1cf4aaea6bd46da8c342c8eafb4c170..9872f74d0d2b25b3462b9ad0e4e88907712cd4b0 100644 --- a/sbroad-core/src/ir/node.rs +++ b/sbroad-core/src/ir/node.rs @@ -843,6 +843,7 @@ pub struct CreateTable { /// Vinyl is supported only for sharded tables. pub engine_type: SpaceEngineType, pub if_not_exists: bool, + pub wait_applied_globally: bool, pub timeout: Decimal, /// Shows which tier the sharded table belongs to. /// Field has value, only if it was specified in [ON TIER] part of CREATE TABLE statement. @@ -862,6 +863,7 @@ impl From<CreateTable> for NodeAligned { pub struct DropTable { pub name: SmolStr, pub if_exists: bool, + pub wait_applied_globally: bool, pub timeout: Decimal, } @@ -878,6 +880,7 @@ pub struct CreateProc { pub body: SmolStr, pub if_not_exists: bool, pub language: Language, + pub wait_applied_globally: bool, pub timeout: Decimal, } @@ -892,6 +895,7 @@ pub struct DropProc { pub name: SmolStr, pub params: Option<Vec<ParamDef>>, pub if_exists: bool, + pub wait_applied_globally: bool, pub timeout: Decimal, } @@ -906,6 +910,7 @@ pub struct RenameRoutine { pub old_name: SmolStr, pub new_name: SmolStr, pub params: Option<Vec<ParamDef>>, + pub wait_applied_globally: bool, pub timeout: Decimal, } @@ -931,6 +936,7 @@ pub struct CreateIndex { pub dimension: Option<u32>, pub distance: Option<RtreeIndexDistanceType>, pub hint: Option<bool>, + pub wait_applied_globally: bool, pub timeout: Decimal, } @@ -944,6 +950,7 @@ impl From<CreateIndex> for NodeAligned { pub struct DropIndex { pub name: SmolStr, pub if_exists: bool, + pub wait_applied_globally: bool, pub timeout: Decimal, } diff --git a/sbroad-core/src/ir/node/ddl.rs b/sbroad-core/src/ir/node/ddl.rs index e49cd10bc89fad9655dd5093b69e2d7201858dc4..85c5c0a86d13c660a918cfd9634ee36385009da2 100644 --- a/sbroad-core/src/ir/node/ddl.rs +++ b/sbroad-core/src/ir/node/ddl.rs @@ -50,6 +50,40 @@ impl DdlOwned { ) }) } + + pub fn wait_applied_globally(&self) -> bool { + match self { + DdlOwned::CreateTable(CreateTable { + wait_applied_globally, + .. + }) + | DdlOwned::DropTable(DropTable { + wait_applied_globally, + .. + }) + | DdlOwned::CreateIndex(CreateIndex { + wait_applied_globally, + .. + }) + | DdlOwned::DropIndex(DropIndex { + wait_applied_globally, + .. + }) + | DdlOwned::CreateProc(CreateProc { + wait_applied_globally, + .. + }) + | DdlOwned::DropProc(DropProc { + wait_applied_globally, + .. + }) + | DdlOwned::RenameRoutine(RenameRoutine { + wait_applied_globally, + .. + }) => *wait_applied_globally, + _ => false, + } + } } impl From<DdlOwned> for NodeAligned { @@ -127,6 +161,40 @@ impl Ddl<'_> { }) } + pub fn wait_applied_globally(&self) -> bool { + match self { + Ddl::CreateTable(CreateTable { + wait_applied_globally, + .. + }) + | Ddl::DropTable(DropTable { + wait_applied_globally, + .. + }) + | Ddl::CreateIndex(CreateIndex { + wait_applied_globally, + .. + }) + | Ddl::DropIndex(DropIndex { + wait_applied_globally, + .. + }) + | Ddl::CreateProc(CreateProc { + wait_applied_globally, + .. + }) + | Ddl::DropProc(DropProc { + wait_applied_globally, + .. + }) + | Ddl::RenameRoutine(RenameRoutine { + wait_applied_globally, + .. + }) => *wait_applied_globally, + _ => false, + } + } + #[must_use] pub fn get_ddl_owned(&self) -> DdlOwned { match self {