diff --git a/sbroad/sbroad-cartridge/src/api/exec_query.rs b/sbroad/sbroad-cartridge/src/api/exec_query.rs index 3023368cbee930155a3ab71ea146bb9b162d7cee..b0c1eaec30cc84b532adf6221a53f2d72d45d66d 100644 --- a/sbroad/sbroad-cartridge/src/api/exec_query.rs +++ b/sbroad/sbroad-cartridge/src/api/exec_query.rs @@ -39,6 +39,9 @@ fn dispatch_query_inner(args: &RawBytes) -> anyhow::Result<RawProcResult> { if let Ok(true) = query.is_acl() { bail!("ACL queries are not supported"); } + if let Ok(true) = query.is_tcl() { + bail!("TCL queries are not supported"); + } if let Ok(true) = query.is_block() { bail!("blocks of commands are not supported"); } diff --git a/sbroad/sbroad-cartridge/test_app/test/integration/tcl.lua b/sbroad/sbroad-cartridge/test_app/test/integration/tcl.lua new file mode 100644 index 0000000000000000000000000000000000000000..32b457d66b15b0868e39f475187c340ea1a31085 --- /dev/null +++ b/sbroad/sbroad-cartridge/test_app/test/integration/tcl.lua @@ -0,0 +1,25 @@ +local t = require('luatest') +local g = t.group('sbroad_with_tcl') + +local helper = require('test.helper.cluster_no_replication') + +g.after_all(function() + helper.stop_test_cluster() +end) + +g.test_transaction_commands = function() + local r, err + local api = helper.cluster:server("api-1").net_box + + r, err = api:call("sbroad.execute", { [[BEGIN]] }) + t.assert_equals(err, nil) + t.assert_equals(r["row_count"], 0) + + r, err = api:call("sbroad.execute", { [[COMMIT]] }) + t.assert_equals(err, nil) + t.assert_equals(r["row_count"], 0) + + r, err = api:call("sbroad.execute", { [[ROLLBACK]] }) + t.assert_equals(err, nil) + t.assert_equals(r["row_count"], 0) +end \ No newline at end of file diff --git a/sbroad/sbroad-core/src/backend/sql/ir.rs b/sbroad/sbroad-core/src/backend/sql/ir.rs index c6f0f76b6545237baed92cffea8a627c102780c7..c90ea3d45d1d179942e60800039bb303ee665e27 100644 --- a/sbroad/sbroad-core/src/backend/sql/ir.rs +++ b/sbroad/sbroad-core/src/backend/sql/ir.rs @@ -318,6 +318,12 @@ impl ExecutionPlan { Some("ACL nodes are not supported in the generated SQL".into()), )); } + Node::Tcl(_) => { + return Err(SbroadError::Unsupported( + Entity::Node, + Some("TCL nodes are not supported in the generated SQL".into()), + )); + } Node::Block(_) => { return Err(SbroadError::Unsupported( Entity::Node, diff --git a/sbroad/sbroad-core/src/backend/sql/tree.rs b/sbroad/sbroad-core/src/backend/sql/tree.rs index 06b2706b3c777718356bf0dd86ec66899e76f1d0..ca934e274201eae12540ff99c919b04387b1d8ba 100644 --- a/sbroad/sbroad-core/src/backend/sql/tree.rs +++ b/sbroad/sbroad-core/src/backend/sql/tree.rs @@ -771,6 +771,7 @@ impl<'p> SyntaxPlan<'p> { | Node::Plugin(..) | Node::Ddl(..) | Node::Acl(..) + | Node::Tcl(..) | Node::Block(..) => { panic!("Node {node:?} is not supported in the syntax plan") } diff --git a/sbroad/sbroad-core/src/executor.rs b/sbroad/sbroad-core/src/executor.rs index ef9575d34a8c878fbb37fa0a67967a7ba0177d86..5a572420d975e3dc4ed9fb5f5f15c99c95cdab4b 100644 --- a/sbroad/sbroad-core/src/executor.rs +++ b/sbroad/sbroad-core/src/executor.rs @@ -377,6 +377,14 @@ where self.exec_plan.get_ir_plan().is_acl() } + /// Checks that query is TCL. + /// + /// # Errors + /// - Plan is invalid + pub fn is_tcl(&self) -> Result<bool, SbroadError> { + self.exec_plan.get_ir_plan().is_tcl() + } + #[cfg(test)] pub fn get_motion_id(&self, slice_id: usize, pos_idx: usize) -> NodeId { *self diff --git a/sbroad/sbroad-core/src/executor/ir.rs b/sbroad/sbroad-core/src/executor/ir.rs index 701fe4e416b328a6f95030bf31528a72db04ca2b..0c973838f9c32eec1567a8d088dc3baf38a0a9bc 100644 --- a/sbroad/sbroad-core/src/executor/ir.rs +++ b/sbroad/sbroad-core/src/executor/ir.rs @@ -813,6 +813,7 @@ impl ExecutionPlan { NodeOwned::Invalid { .. } | NodeOwned::Ddl { .. } | NodeOwned::Acl { .. } + | NodeOwned::Tcl(_) | NodeOwned::Plugin { .. } | NodeOwned::Deallocate { .. } | NodeOwned::Block { .. } => { diff --git a/sbroad/sbroad-core/src/frontend/sql.rs b/sbroad/sbroad-core/src/frontend/sql.rs index 281b28b2598dd2507e499ca0d04e2d4e702bbac1..2d9edabbaf23a9a7a6df0de0035392ee7453367f 100644 --- a/sbroad/sbroad-core/src/frontend/sql.rs +++ b/sbroad/sbroad-core/src/frontend/sql.rs @@ -4,6 +4,7 @@ //! and builds the intermediate representation (IR). use crate::ir::node::deallocate::Deallocate; +use crate::ir::node::tcl::Tcl; use crate::ir::node::{Reference, ReferenceAsteriskSource}; use ahash::{AHashMap, AHashSet}; use core::panic; @@ -4606,6 +4607,21 @@ impl AbstractSyntaxTree { let plan_id = plan.nodes.push(deallocate.into()); map.add(id, plan_id); } + Rule::Begin => { + let begin = Tcl::Begin; + let plan_id = plan.nodes.push(begin.into()); + map.add(id, plan_id); + } + Rule::Commit => { + let commit = Tcl::Commit; + let plan_id = plan.nodes.push(commit.into()); + map.add(id, plan_id); + } + Rule::Rollback => { + let rollback = Tcl::Rollback; + let plan_id = plan.nodes.push(rollback.into()); + map.add(id, plan_id); + } _ => {} } } diff --git a/sbroad/sbroad-core/src/frontend/sql/query.pest b/sbroad/sbroad-core/src/frontend/sql/query.pest index 32c1b4eec68cf5f9fbe9486ca52cf92b7a8f5ec2..c443a572730c2a72d00f97086688e8b4f07e3768 100644 --- a/sbroad/sbroad-core/src/frontend/sql/query.pest +++ b/sbroad/sbroad-core/src/frontend/sql/query.pest @@ -1,4 +1,4 @@ -Command = _{ SOI ~ (Query | ExplainQuery | Block | DDL | ACL | Plugin | Deallocate | EmptyQuery) ~ EOF } +Command = _{ SOI ~ (Query | ExplainQuery | Block | DDL | ACL | TCL | Plugin | Deallocate | EmptyQuery) ~ EOF } // Helper rule to denote we have to update plan relations from metadata // (with Table which name corresponds to current node). @@ -313,6 +313,11 @@ Identifier = @{ DelimitedIdentifier | RegularIdentifier } Deallocate = ${ ^"deallocate" ~ (WithPrepare | WithoutPrepare) } WithPrepare = _{ W ~ ^"prepare" ~ W ~ (^"all" | Identifier) } WithoutPrepare = _{ W ~ (^"all" | Identifier) } + +TCL = _{ Begin | Commit | Rollback } + Begin = { ^"begin" } + Commit = { ^"commit" } + Rollback = { ^"rollback" } EmptyQuery = { WHITESPACE* } diff --git a/sbroad/sbroad-core/src/ir.rs b/sbroad/sbroad-core/src/ir.rs index 511dd4321a48e0a01458054cbe03cec5c4ce8179..4b804ed9e6629e5c9fd28cdaec5bd797ee1af454 100644 --- a/sbroad/sbroad-core/src/ir.rs +++ b/sbroad/sbroad-core/src/ir.rs @@ -26,6 +26,7 @@ use crate::errors::{Action, Entity, SbroadError, TypeError}; use crate::executor::engine::helpers::to_user; use crate::executor::engine::TableVersionMap; use crate::ir::node::plugin::{MutPlugin, Plugin}; +use crate::ir::node::tcl::Tcl; use crate::ir::node::{ Alias, ArenaType, ArithmeticExpr, BoolExpr, Case, Cast, Concat, Constant, ExprInParentheses, GroupBy, Having, Insert, Limit, Motion, MutNode, Node, Node136, Node232, Node32, Node64, @@ -104,6 +105,11 @@ impl Nodes { Node32::UnionAll(union_all) => Node::Relational(Relational::UnionAll(union_all)), Node32::Values(values) => Node::Relational(Relational::Values(values)), Node32::Deallocate(deallocate) => Node::Deallocate(deallocate), + Node32::Tcl(tcl) => match *tcl { + Tcl::Begin => Node::Tcl(Tcl::Begin), + Tcl::Commit => Node::Tcl(Tcl::Commit), + Tcl::Rollback => Node::Tcl(Tcl::Rollback), + }, }), ArenaType::Arena64 => self.arena64.get(id.offset as usize).map(|node| match node { Node64::Case(case) => Node::Expression(Expression::Case(case)), @@ -230,6 +236,11 @@ impl Nodes { } Node32::Values(values) => MutNode::Relational(MutRelational::Values(values)), Node32::Deallocate(deallocate) => MutNode::Deallocate(deallocate), + Node32::Tcl(tcl) => match *tcl { + Tcl::Begin => MutNode::Tcl(node::tcl::Tcl::Begin), + Tcl::Commit => MutNode::Tcl(node::tcl::Tcl::Commit), + Tcl::Rollback => MutNode::Tcl(node::tcl::Tcl::Rollback), + }, }), ArenaType::Arena64 => self .arena64 @@ -1303,6 +1314,16 @@ impl Plan { Ok(matches!(top, Node::Acl(_))) } + /// Checks that plan is TCL query. + /// + /// # Errors + /// - top node doesn't exist in the plan or is invalid. + pub fn is_tcl(&self) -> Result<bool, SbroadError> { + let top_id = self.get_top()?; + let top = self.get_node(top_id)?; + Ok(matches!(top, Node::Tcl(_))) + } + /// Checks that plan is a plugin query. /// /// # Errors @@ -1344,6 +1365,7 @@ impl Plan { | Node::Ddl(..) | Node::Invalid(..) | Node::Acl(..) + | Node::Tcl(..) | Node::Block(..) | Node::Plugin(..) | Node::Deallocate(..) => Err(SbroadError::Invalid( @@ -1366,6 +1388,7 @@ impl Plan { | MutNode::Ddl(..) | MutNode::Invalid(..) | MutNode::Acl(..) + | MutNode::Tcl(..) | MutNode::Plugin(..) | MutNode::Deallocate(..) | MutNode::Block(..) => Err(SbroadError::Invalid( @@ -1426,6 +1449,7 @@ impl Plan { | MutNode::Ddl(..) | MutNode::Invalid(..) | MutNode::Acl(..) + | MutNode::Tcl(..) | MutNode::Plugin(..) | MutNode::Deallocate(..) | MutNode::Block(..) => Err(SbroadError::Invalid( diff --git a/sbroad/sbroad-core/src/ir/api/parameter.rs b/sbroad/sbroad-core/src/ir/api/parameter.rs index ed3d608cc0d2a46e8842ff5d0ce22f5427b4c2fd..baca13c399bc6a7959cdb69d7f43a145463cffaa 100644 --- a/sbroad/sbroad-core/src/ir/api/parameter.rs +++ b/sbroad/sbroad-core/src/ir/api/parameter.rs @@ -371,6 +371,7 @@ impl<'binder> ParamsBinder<'binder> { | Node::Parameter(..) | Node::Ddl(..) | Node::Acl(..) + | Node::Tcl(..) | Node::Plugin(_) | Node::Deallocate(..) => {} } @@ -557,6 +558,7 @@ impl<'binder> ParamsBinder<'binder> { MutNode::Invalid(..) | MutNode::Parameter(..) | MutNode::Ddl(..) + | MutNode::Tcl(..) | MutNode::Plugin(_) | MutNode::Acl(..) | MutNode::Deallocate(..) => {} diff --git a/sbroad/sbroad-core/src/ir/block.rs b/sbroad/sbroad-core/src/ir/block.rs index a6ffa29aec54718db6001624636b572977cc6a90..4659a8d92f881daf86b6c9a65a83c5ef5d28e372 100644 --- a/sbroad/sbroad-core/src/ir/block.rs +++ b/sbroad/sbroad-core/src/ir/block.rs @@ -20,6 +20,7 @@ impl Plan { | Node::Relational(_) | Node::Ddl(..) | Node::Acl(..) + | Node::Tcl(..) | Node::Invalid(..) | Node::Plugin(_) | Node::Deallocate(..) @@ -44,6 +45,7 @@ impl Plan { | MutNode::Relational(_) | MutNode::Ddl(..) | MutNode::Acl(..) + | MutNode::Tcl(..) | MutNode::Invalid(..) | MutNode::Plugin(_) | MutNode::Deallocate(..) diff --git a/sbroad/sbroad-core/src/ir/expression/types.rs b/sbroad/sbroad-core/src/ir/expression/types.rs index cb7d84e8da15c6a18500351ab59cb31217e4c09f..eb4cd7071be66a9c93dafc0e99b57f0a1f373a95 100644 --- a/sbroad/sbroad-core/src/ir/expression/types.rs +++ b/sbroad/sbroad-core/src/ir/expression/types.rs @@ -23,25 +23,29 @@ impl Plan { // Parameter nodes must recalculate their type during // binding (see `bind_params` function). Node::Parameter(ty) => Ok(ty.param_type.unwrap_or(Type::Scalar)), - Node::Ddl(_) => Err(SbroadError::Invalid( + Node::Ddl(ddl) => Err(SbroadError::Invalid( Entity::Node, - Some("DDL node has no type".to_smolstr()), + Some(format_smolstr!("DDL node {ddl:?} has no type")), )), - Node::Acl(_) => Err(SbroadError::Invalid( + Node::Acl(acl) => Err(SbroadError::Invalid( Entity::Node, - Some("ACL node has no type".to_smolstr()), + Some(format_smolstr!("ACL node {acl:?} has no type")), )), - Node::Invalid(_) => Err(SbroadError::Invalid( + Node::Tcl(tcl) => Err(SbroadError::Invalid( Entity::Node, - Some("Invalid node has no type".to_smolstr()), + Some(format_smolstr!("TCL node {tcl:?} has no type")), )), - Node::Block(_) => Err(SbroadError::Invalid( + Node::Invalid(invalid) => Err(SbroadError::Invalid( Entity::Node, - Some("code block node has no type".to_smolstr()), + Some(format_smolstr!("Invalid node {invalid:?} has no type")), )), - Node::Plugin(_) => Err(SbroadError::Invalid( + Node::Plugin(plugin) => Err(SbroadError::Invalid( Entity::Node, - Some("Plugin node has no type".to_smolstr()), + Some(format_smolstr!("Plugin node {plugin:?} has no type")), + )), + Node::Block(block) => Err(SbroadError::Invalid( + Entity::Node, + Some(format_smolstr!("Block node {block:?} has no type")), )), Node::Deallocate(_) => Err(SbroadError::Invalid( Entity::Node, diff --git a/sbroad/sbroad-core/src/ir/node.rs b/sbroad/sbroad-core/src/ir/node.rs index 26d2af47a63915f57053b0d44ae717efd8098dea..8929fa9f605954a3020793923722d3f1fb96213d 100644 --- a/sbroad/sbroad-core/src/ir/node.rs +++ b/sbroad/sbroad-core/src/ir/node.rs @@ -13,6 +13,7 @@ use tarantool::{ index::{IndexType, RtreeIndexDistanceType}, space::SpaceEngineType, }; +use tcl::Tcl; use super::{ ddl::AlterSystemType, @@ -40,6 +41,7 @@ pub mod deallocate; pub mod expression; pub mod plugin; pub mod relational; +pub mod tcl; #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, Hash, Copy)] pub enum ArenaType { @@ -1064,6 +1066,7 @@ pub enum Node32 { UnionAll(UnionAll), Values(Values), Deallocate(Deallocate), + Tcl(Tcl), } impl Node32 { @@ -1093,6 +1096,11 @@ impl Node32 { Node32::UnionAll(union_all) => NodeOwned::Relational(RelOwned::UnionAll(union_all)), Node32::Values(values) => NodeOwned::Relational(RelOwned::Values(values)), Node32::Deallocate(deallocate) => NodeOwned::Deallocate(deallocate), + Node32::Tcl(tcl) => match tcl { + Tcl::Begin => NodeOwned::Tcl(Tcl::Begin), + Tcl::Commit => NodeOwned::Tcl(Tcl::Commit), + Tcl::Rollback => NodeOwned::Tcl(Tcl::Rollback), + }, } } } @@ -1326,6 +1334,7 @@ pub enum Node<'nodes> { Invalid(&'nodes Invalid), Plugin(Plugin<'nodes>), Deallocate(&'nodes Deallocate), + Tcl(Tcl), } #[allow(clippy::module_name_repetitions)] @@ -1340,6 +1349,7 @@ pub enum MutNode<'nodes> { Invalid(&'nodes mut Invalid), Plugin(MutPlugin<'nodes>), Deallocate(&'nodes mut Deallocate), + Tcl(Tcl), } impl Node<'_> { @@ -1355,6 +1365,7 @@ impl Node<'_> { Node::Invalid(inv) => NodeOwned::Invalid((*inv).clone()), Node::Plugin(plugin) => NodeOwned::Plugin(plugin.get_plugin_owned()), Node::Deallocate(deallocate) => NodeOwned::Deallocate((*deallocate).clone()), + Node::Tcl(tcl) => NodeOwned::Tcl(tcl), } } } @@ -1372,6 +1383,7 @@ pub enum NodeOwned { Invalid(Invalid), Plugin(PluginOwned), Deallocate(Deallocate), + Tcl(Tcl), } impl From<NodeOwned> for NodeAligned { @@ -1386,6 +1398,7 @@ impl From<NodeOwned> for NodeAligned { NodeOwned::Relational(rel) => rel.into(), NodeOwned::Plugin(p) => p.into(), NodeOwned::Deallocate(d) => d.into(), + NodeOwned::Tcl(tcl) => tcl.into(), } } } diff --git a/sbroad/sbroad-core/src/ir/node/tcl.rs b/sbroad/sbroad-core/src/ir/node/tcl.rs new file mode 100644 index 0000000000000000000000000000000000000000..3b25ea44efd60d3ff4ff8c03107895be3af26ae7 --- /dev/null +++ b/sbroad/sbroad-core/src/ir/node/tcl.rs @@ -0,0 +1,60 @@ +use super::{NodeAligned, NodeOwned}; +use crate::ir::{Entity, Node, Node32, NodeId, Plan, SbroadError}; +use serde::{Deserialize, Serialize}; +use smol_str::format_smolstr; + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum Tcl { + Begin, + Commit, + Rollback, +} + +impl Tcl { + /// Returns a string slice representing the `Tcl` variant. + pub fn as_str(&self) -> &str { + match self { + Tcl::Begin => "Begin", + Tcl::Commit => "Commit", + Tcl::Rollback => "Rollback", + } + } +} + +impl From<Tcl> for NodeAligned { + fn from(tcl: Tcl) -> Self { + match tcl { + Tcl::Begin => Self::Node32(Node32::Tcl(Tcl::Begin)), + Tcl::Commit => Self::Node32(Node32::Tcl(Tcl::Commit)), + Tcl::Rollback => Self::Node32(Node32::Tcl(Tcl::Rollback)), + } + } +} + +impl From<Tcl> for NodeOwned { + fn from(tcl: Tcl) -> Self { + match tcl { + Tcl::Begin => NodeOwned::Tcl(Tcl::Begin), + Tcl::Commit => NodeOwned::Tcl(Tcl::Commit), + Tcl::Rollback => NodeOwned::Tcl(Tcl::Rollback), + } + } +} + +impl Plan { + /// Get TCL node from the plan arena. + /// + /// # Errors + /// - the node index is absent in arena + /// - current node is not of TCL type + pub fn get_tcl_node(&self, node_id: NodeId) -> Result<Tcl, SbroadError> { + let node = self.get_node(node_id)?; + match node { + Node::Tcl(tcl) => Ok(tcl), + _ => Err(SbroadError::Invalid( + Entity::Node, + Some(format_smolstr!("node is not TCL type: {node:?}")), + )), + } + } +} diff --git a/sbroad/sbroad-core/src/ir/tree/expression.rs b/sbroad/sbroad-core/src/ir/tree/expression.rs index 2cf20ae2abcba4431901082a67aa02b6eddbd6ca..23226e2fedce3380de89f63e5135a8506dd9f6e9 100644 --- a/sbroad/sbroad-core/src/ir/tree/expression.rs +++ b/sbroad/sbroad-core/src/ir/tree/expression.rs @@ -171,6 +171,7 @@ fn expression_next<'nodes>(iter: &mut impl ExpressionTreeIterator<'nodes>) -> Op Node::Acl(_) | Node::Block(_) | Node::Ddl(_) + | Node::Tcl(_) | Node::Relational(_) | Node::Invalid(_) | Node::Plugin(_) diff --git a/sbroad/sbroad-core/src/ir/tree/relation.rs b/sbroad/sbroad-core/src/ir/tree/relation.rs index a269b13506387110ab81ef94d070b0955a4313e6..e2532e0ddfa84ff05423923652832d17c6e60cb6 100644 --- a/sbroad/sbroad-core/src/ir/tree/relation.rs +++ b/sbroad/sbroad-core/src/ir/tree/relation.rs @@ -108,6 +108,7 @@ fn relational_next<'nodes>(iter: &mut impl RelationalTreeIterator<'nodes>) -> Op | Node::Invalid(_) | Node::Ddl(_) | Node::Acl(_) + | Node::Tcl(_) | Node::Block(_) | Node::Plugin(_) | Node::Deallocate(_) => None, diff --git a/sbroad/sbroad-core/src/ir/tree/subtree.rs b/sbroad/sbroad-core/src/ir/tree/subtree.rs index fc52d2e39c10cad24e296c81157ceae1a72256ad..84a0d39906dce6483487a0ea9e02aef19862a5a5 100644 --- a/sbroad/sbroad-core/src/ir/tree/subtree.rs +++ b/sbroad/sbroad-core/src/ir/tree/subtree.rs @@ -205,6 +205,7 @@ fn subtree_next<'plan>( | Node::Parameter(..) | Node::Ddl(..) | Node::Acl(..) + | Node::Tcl(..) | Node::Block(..) | Node::Plugin(..) | Node::Deallocate(..) => None, diff --git a/src/pgproto/backend.rs b/src/pgproto/backend.rs index 1270e80776b2178de6fd4d47fab78f5477ef52c2..156b465970b1a4ceb65e0c5e0e22200773a14a40 100644 --- a/src/pgproto/backend.rs +++ b/src/pgproto/backend.rs @@ -142,6 +142,8 @@ pub fn bind( && !plan.is_acl()? && !plan.is_plugin()? && !plan.is_deallocate()? + && !plan.is_tcl()? + && !plan.is_plugin()? { plan.bind_params(params)?; plan.apply_options()?; @@ -186,7 +188,7 @@ pub fn parse(cid: ClientId, name: String, query: &str, param_oids: Vec<Oid>) -> Ok(plan) }) .map_err(|e| PgError::Other(e.into()))??; - if !plan.is_empty() && !plan.is_ddl()? && !plan.is_acl()? { + if !plan.is_empty() && !plan.is_tcl()? && !plan.is_ddl()? && !plan.is_acl()? { cache.put(query.into(), plan.clone())?; } let statement = Statement::new(plan, param_oids)?; diff --git a/src/pgproto/backend/describe.rs b/src/pgproto/backend/describe.rs index 29231524c35b0d3953156b128ec2705a44e885e9..655c7e3b309314e1ec4c093fb0f4caa67e98cfea 100644 --- a/src/pgproto/backend/describe.rs +++ b/src/pgproto/backend/describe.rs @@ -13,7 +13,7 @@ use sbroad::{ acl::GrantRevokeType, node::{ acl::Acl, block::Block, ddl::Ddl, expression::Expression, plugin::Plugin, - relational::Relational, Alias, GrantPrivilege, Node, RevokePrivilege, + relational::Relational, tcl::Tcl, Alias, GrantPrivilege, Node, RevokePrivilege, }, relation::Type as SbroadType, Plan, @@ -37,6 +37,7 @@ pub enum QueryType { Dql = 3, Explain = 4, Empty = 5, + Tcl = 6, Deallocate = 7, } @@ -46,6 +47,7 @@ pub enum CommandTag { AddTrier = 37, AlterRole = 0, AlterSystem = 22, + Begin = 52, CallProcedure = 16, CreateProcedure = 14, CreateRole = 1, @@ -53,6 +55,7 @@ pub enum CommandTag { CreateIndex = 18, CreatePlugin = 31, ChangeConfig = 39, + Commit = 53, DropProcedure = 15, DropRole = 3, DropTable = 4, @@ -72,6 +75,7 @@ pub enum CommandTag { RemoveTier = 38, RenameRoutine = 17, Revoke = 10, + Rollback = 54, RevokeRole = 11, #[default] Select = 12, @@ -123,6 +127,9 @@ impl CommandTag { Self::AddTrier => "ALTER PLUGIN ADD SERVICE TO TIER", Self::RemoveTier => "ALTER PLUGIN REMOVE SERVICE FROM TIER", Self::ChangeConfig => "ALTER PLUGIN SET", + Self::Begin => "BEGIN", + Self::Commit => "COMMIT", + Self::Rollback => "ROLLBACK", // Response on an empty query is EmptyQueryResponse with no tag. // https://www.postgresql.org/docs/current/protocol-message-formats.html#PROTOCOL-MESSAGE-FORMATS-EMPTYQUERYRESPONSE Self::EmptyQuery => "", @@ -165,6 +172,7 @@ impl From<CommandTag> for QueryType { CommandTag::Explain => QueryType::Explain, CommandTag::Select => QueryType::Dql, CommandTag::Deallocate | CommandTag::DeallocateAll => QueryType::Deallocate, + CommandTag::Begin | CommandTag::Commit | CommandTag::Rollback => QueryType::Tcl, CommandTag::EmptyQuery => QueryType::Empty, } } @@ -203,6 +211,11 @@ impl TryFrom<&Node<'_>> for CommandTag { Ddl::SetParam { .. } => Ok(CommandTag::SetParam), Ddl::SetTransaction { .. } => Ok(CommandTag::SetTransaction), }, + Node::Tcl(tcl) => match tcl { + Tcl::Begin => Ok(CommandTag::Begin), + Tcl::Commit => Ok(CommandTag::Commit), + Tcl::Rollback => Ok(CommandTag::Rollback), + }, Node::Plugin(plugin) => match plugin { Plugin::Create { .. } => Ok(CommandTag::CreatePlugin), Plugin::Drop { .. } => Ok(CommandTag::DropPlugin), @@ -397,6 +410,7 @@ impl Describe { QueryType::Dql => Ok(Describe::default() .with_command_tag(command_tag) .with_metadata(dql_output_format(plan)?)), + QueryType::Tcl => Ok(Describe::default().with_command_tag(command_tag)), QueryType::Explain => Ok(Describe::default() .with_command_tag(command_tag) .with_metadata(explain_output_format())), @@ -420,6 +434,7 @@ impl Describe { | QueryType::Ddl | QueryType::Dml | QueryType::Deallocate + | QueryType::Tcl | QueryType::Empty => None, QueryType::Dql | QueryType::Explain => { let row_description = self @@ -507,6 +522,7 @@ impl PortalDescribe { | QueryType::Ddl | QueryType::Dml | QueryType::Deallocate + | QueryType::Tcl | QueryType::Empty => None, QueryType::Dql | QueryType::Explain => { let metadata = &self.describe.metadata; diff --git a/src/pgproto/backend/pgproc.rs b/src/pgproto/backend/pgproc.rs index 57d01b89282b21b15522f9e8536aedb616da57e4..de8d0c984e80b7bbbbf47cca9fa9fe7504721611 100644 --- a/src/pgproto/backend/pgproc.rs +++ b/src/pgproto/backend/pgproc.rs @@ -79,7 +79,10 @@ pub fn proc_pg_describe_portal(id: ClientId, name: String) -> PgResult<PortalDes pub fn proc_pg_execute(id: ClientId, name: String, max_rows: i64) -> PgResult<Tuple> { let result = backend::execute(id, name, max_rows)?; let bytes = match &result { - ExecuteResult::AclOrDdl { .. } | ExecuteResult::Dml { .. } | ExecuteResult::Empty => { + ExecuteResult::AclOrDdl { .. } + | ExecuteResult::Dml { .. } + | ExecuteResult::Tcl { .. } + | ExecuteResult::Empty => { let row_count = if let ExecuteResult::Dml { row_count, .. } = result { Some(row_count) } else { diff --git a/src/pgproto/backend/result.rs b/src/pgproto/backend/result.rs index d89d7bf26ba6514f3fdcdcc442db14112b43b366..d5635d9409467994c050139791069d7b436c1591 100644 --- a/src/pgproto/backend/result.rs +++ b/src/pgproto/backend/result.rs @@ -53,6 +53,10 @@ pub enum ExecuteResult { tag: CommandTag, row_count: usize, }, + Tcl { + /// Tag of the command. + tag: CommandTag, + }, SuspendedDql { /// Rows we'll send to the client. rows: Rows, diff --git a/src/pgproto/backend/storage.rs b/src/pgproto/backend/storage.rs index 0b81c07a349502f4b98bc6759fd869b4fe31047c..b3fcfd48807181b2f534acea198e20fa8364d5c3 100644 --- a/src/pgproto/backend/storage.rs +++ b/src/pgproto/backend/storage.rs @@ -501,6 +501,10 @@ impl Portal { let tag = self.describe().command_tag(); PortalState::ResultReady(ExecuteResult::AclOrDdl { tag }) } + QueryType::Tcl => { + let tag = self.describe().command_tag(); + PortalState::ResultReady(ExecuteResult::Tcl { tag }) + } QueryType::Dml => { let row_count = get_row_count_from_tuple(&tuple)?; let tag = self.describe().command_tag(); diff --git a/src/pgproto/client/extended_query.rs b/src/pgproto/client/extended_query.rs index d5067bd9bcd8b2b1bfa077f359706801b28cdbee..81d4afb028cd6cb8a7279a3baf072a254d23f6a3 100644 --- a/src/pgproto/client/extended_query.rs +++ b/src/pgproto/client/extended_query.rs @@ -45,6 +45,9 @@ pub fn process_execute_message( ExecuteResult::AclOrDdl { tag } => { stream.write_message_noflush(messages::command_complete(&tag))?; } + ExecuteResult::Tcl { tag } => { + stream.write_message_noflush(messages::command_complete(&tag))?; + } ExecuteResult::Dml { tag, row_count } => { stream.write_message_noflush(messages::command_complete_with_row_count( &tag, row_count, diff --git a/src/pgproto/client/simple_query.rs b/src/pgproto/client/simple_query.rs index f735c7e8d2043503a30c48723cb1b3a58c171436..6139120ffde978e17ca2a4e58f53005b4772136d 100644 --- a/src/pgproto/client/simple_query.rs +++ b/src/pgproto/client/simple_query.rs @@ -14,6 +14,9 @@ pub fn process_query_message( ExecuteResult::AclOrDdl { tag } => { stream.write_message(messages::command_complete(&tag))?; } + ExecuteResult::Tcl { tag } => { + stream.write_message(messages::command_complete(&tag))?; + } ExecuteResult::Dml { tag, row_count } => { stream.write_message(messages::command_complete_with_row_count(&tag, row_count))?; } diff --git a/src/sql.rs b/src/sql.rs index 68d6cbbcf45564a6bcbc8485d25a41ea42eff4c1..11ca2985af7ec96b9101cfbfd726e036f1219018 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -222,6 +222,18 @@ pub fn dispatch(mut query: Query<RouterRuntime>) -> traft::Result<Tuple> { return empty_query_response(); } + if query.is_tcl()? { + let ir_plan = query.get_exec_plan().get_ir_plan(); + let top_id = ir_plan.get_top()?; + let tcl = ir_plan.get_tcl_node(top_id)?; + tlog!( + Warning, + "Transactions are currently unsupported. Empty query response provided for {}.", + tcl.as_str() + ); + return empty_query_response(); + } + if query.is_ddl()? || query.is_acl()? { let ir_plan = query.get_exec_plan().get_ir_plan(); let top_id = ir_plan.get_top()?; diff --git a/test/pgproto/extended_query_test.py b/test/pgproto/extended_query_test.py index 483a14591e14dcc7df8b08837b04335469dabfcb..128a67faf2815cd30414ad39abe80906ba166f16 100644 --- a/test/pgproto/extended_query_test.py +++ b/test/pgproto/extended_query_test.py @@ -354,3 +354,76 @@ def test_deallocate(postgres: Postgres): ps2.run() conn.close() + + +def test_tcl(postgres: Postgres): + user = "admin" + password = "P@ssw0rd" + host = postgres.host + port = postgres.port + postgres.instance.sql(f"ALTER USER \"{user}\" WITH PASSWORD '{password}'") + + conn = psycopg.connect( + f"user={user} password={password} host={host} port={port} sslmode=disable" + ) + # With autocommit + conn.autocommit = True + + cur = conn.execute("CREATE TABLE test_table (id INT PRIMARY KEY, name TEXT);") + + cur = conn.execute( + "INSERT INTO test_table (id, name) VALUES (1,'Alice'), (2,'Bob');" + ) + + cur = conn.execute("SELECT * FROM test_table;") + rows = cur.fetchall() + assert rows == [(1, "Alice"), (2, "Bob")] + + cur = conn.execute("DROP TABLE test_table;") + + # Without autocommit + conn.autocommit = False + + cur = conn.execute("BEGIN;", prepare=True) + assert cur.pgresult is not None + assert cur.pgresult.status == ExecStatus.COMMAND_OK + + cur = conn.execute("CREATE TABLE test_table (id INT PRIMARY KEY, name TEXT);") + + cur = conn.execute( + "INSERT INTO test_table (id, name) VALUES (1,'Alice'), (2,'Bob');" + ) + + cur = conn.execute("SELECT * FROM test_table;") + rows = cur.fetchall() + assert rows == [(1, "Alice"), (2, "Bob")] + + cur = conn.execute("ROLLBACK;", prepare=True) + assert cur.pgresult is not None + assert cur.pgresult.status == ExecStatus.COMMAND_OK + + cur = conn.execute("SELECT * FROM test_table;") + rows = cur.fetchall() + assert rows == [(1, "Alice"), (2, "Bob")] + + cur = conn.execute("DROP TABLE test_table;") + + cur = conn.execute("BEGIN;", prepare=True) + assert cur.pgresult is not None + assert cur.pgresult.status == ExecStatus.COMMAND_OK + + cur = conn.execute("CREATE TABLE test_table (id INT PRIMARY KEY, name TEXT);") + + cur = conn.execute( + "INSERT INTO test_table (id, name) VALUES (1,'Alice'), (2,'Bob');" + ) + + cur = conn.execute("COMMIT;", prepare=True) + assert cur.pgresult is not None + cur.pgresult.status == ExecStatus.COMMAND_OK + + cur = conn.execute("SELECT * FROM test_table;") + rows = cur.fetchall() + assert rows == [(1, "Alice"), (2, "Bob")] + + cur = conn.execute("DROP TABLE test_table;") diff --git a/test/pgproto/simple_query_test.py b/test/pgproto/simple_query_test.py index c547c4ea46454724909e1c8ecb65c38afa7232ee..67300f2d9f6963507287ef99c5089b5b5052e476 100644 --- a/test/pgproto/simple_query_test.py +++ b/test/pgproto/simple_query_test.py @@ -287,3 +287,76 @@ def test_deallocate(postgres: Postgres): ps2.run() conn.close() + + +def test_tcl(postgres: Postgres): + user = "admin" + password = "P@ssw0rd" + host = postgres.host + port = postgres.port + postgres.instance.sql(f"ALTER USER \"{user}\" WITH PASSWORD '{password}'") + + conn = psycopg.connect( + f"user={user} password={password} host={host} port={port} sslmode=disable" + ) + # With autocommit + conn.autocommit = True + + cur = conn.execute("CREATE TABLE test_table (id INT PRIMARY KEY, name TEXT);") + + cur = conn.execute( + "INSERT INTO test_table (id, name) VALUES (1,'Alice'), (2,'Bob');" + ) + + cur = conn.execute("SELECT * FROM test_table;") + rows = cur.fetchall() + assert sorted(rows, key=lambda x: x[0]) == [(1, "Alice"), (2, "Bob")] + + cur = conn.execute("DROP TABLE test_table;") + + # Without autocommit + conn.autocommit = False + + cur = conn.execute("BEGIN;", prepare=False) + assert cur.pgresult is not None + assert cur.pgresult.status == ExecStatus.COMMAND_OK + + cur = conn.execute("CREATE TABLE test_table (id INT PRIMARY KEY, name TEXT);") + + cur = conn.execute( + "INSERT INTO test_table (id, name) VALUES (1,'Alice'), (2,'Bob');" + ) + + cur = conn.execute("SELECT * FROM test_table;") + rows = cur.fetchall() + assert sorted(rows, key=lambda x: x[0]) == [(1, "Alice"), (2, "Bob")] + + cur = conn.execute("ROLLBACK;", prepare=False) + assert cur.pgresult is not None + assert cur.pgresult.status == ExecStatus.COMMAND_OK + + cur = conn.execute("SELECT * FROM test_table;") + rows = cur.fetchall() + assert sorted(rows, key=lambda x: x[0]) == [(1, "Alice"), (2, "Bob")] + + cur = conn.execute("DROP TABLE test_table;") + + cur = conn.execute("BEGIN;", prepare=False) + assert cur.pgresult is not None + assert cur.pgresult.status == ExecStatus.COMMAND_OK + + cur = conn.execute("CREATE TABLE test_table (id INT PRIMARY KEY, name TEXT);") + + cur = conn.execute( + "INSERT INTO test_table (id, name) VALUES (1,'Alice'), (2,'Bob');" + ) + + cur = conn.execute("COMMIT;", prepare=False) + assert cur.pgresult is not None + cur.pgresult.status == ExecStatus.COMMAND_OK + + cur = conn.execute("SELECT * FROM test_table;") + rows = cur.fetchall() + assert sorted(rows, key=lambda x: x[0]) == [(1, "Alice"), (2, "Bob")] + + cur = conn.execute("DROP TABLE test_table;")