From 433a15b6316534619b5076a8d50ccdf4d12070fc Mon Sep 17 00:00:00 2001
From: Erik Khamitov <e.khamitov@picodata.io>
Date: Wed, 11 Dec 2024 12:59:01 +0300
Subject: [PATCH] feat(sql): add DEALLOCATE support(removes prepared
 statements)

---
 .../test_app/test/integration/deallocate.lua  | 37 ++++++++++
 sbroad/sbroad-core/src/backend/sql/ir.rs      |  9 +++
 sbroad/sbroad-core/src/backend/sql/tree.rs    |  6 +-
 sbroad/sbroad-core/src/executor.rs            | 14 +++-
 sbroad/sbroad-core/src/executor/ir.rs         |  1 +
 sbroad/sbroad-core/src/frontend/sql.rs        | 15 ++++
 .../sbroad-core/src/frontend/sql/query.pest   |  6 +-
 sbroad/sbroad-core/src/ir.rs                  | 21 ++++--
 sbroad/sbroad-core/src/ir/api/parameter.rs    |  6 +-
 sbroad/sbroad-core/src/ir/block.rs            |  2 +
 sbroad/sbroad-core/src/ir/expression/types.rs |  4 ++
 sbroad/sbroad-core/src/ir/node.rs             |  9 +++
 sbroad/sbroad-core/src/ir/node/deallocate.rs  | 42 +++++++++++
 sbroad/sbroad-core/src/ir/tree/expression.rs  |  1 +
 sbroad/sbroad-core/src/ir/tree/relation.rs    |  3 +-
 sbroad/sbroad-core/src/ir/tree/subtree.rs     |  3 +-
 src/pgproto/backend.rs                        | 22 +++++-
 src/pgproto/backend/describe.rs               | 26 ++++++-
 src/pgproto/backend/storage.rs                | 16 +++++
 src/sql.rs                                    |  4 ++
 test/pgproto/extended_query_test.py           | 72 +++++++++++++++++++
 test/pgproto/simple_query_test.py             | 63 ++++++++++++++++
 22 files changed, 366 insertions(+), 16 deletions(-)
 create mode 100644 sbroad/sbroad-cartridge/test_app/test/integration/deallocate.lua
 create mode 100644 sbroad/sbroad-core/src/ir/node/deallocate.rs

diff --git a/sbroad/sbroad-cartridge/test_app/test/integration/deallocate.lua b/sbroad/sbroad-cartridge/test_app/test/integration/deallocate.lua
new file mode 100644
index 0000000000..a385ce548b
--- /dev/null
+++ b/sbroad/sbroad-cartridge/test_app/test/integration/deallocate.lua
@@ -0,0 +1,37 @@
+local t = require('luatest')
+local g = t.group('integration_api.deallocate')
+
+local helper = require('test.helper.cluster_no_replication')
+
+g.after_all(function()
+    helper.stop_test_cluster()
+end)
+
+g.test_deallocate = function()
+    local r, err
+    local api = helper.cluster:server("api-1").net_box
+
+    r, err = api:call("sbroad.execute", { [[DEALLOCATE name]] })
+    t.assert_equals(err, nil)
+    t.assert_equals(r["row_count"], 0)
+
+    r, err = api:call("sbroad.execute", { [[DEALLOCATE ALL]] })
+    t.assert_equals(err, nil)
+    t.assert_equals(r["row_count"], 0)
+
+    r, err = api:call("sbroad.execute", { [[DEALLOCATE prepare]] })
+    t.assert_equals(err, nil)
+    t.assert_equals(r["row_count"], 0)
+
+    r, err = api:call("sbroad.execute", { [[DEALLOCATE PREPARE prepare]] })
+    t.assert_equals(err, nil)
+    t.assert_equals(r["row_count"], 0)
+
+    r, err = api:call("sbroad.execute", { [[DEALLOCATE PREPARE name]] })
+    t.assert_equals(err, nil)
+    t.assert_equals(r["row_count"], 0)
+
+    r, err = api:call("sbroad.execute", { [[DEALLOCATE PREPARE ALL]] })
+    t.assert_equals(err, nil)
+    t.assert_equals(r["row_count"], 0)
+end
diff --git a/sbroad/sbroad-core/src/backend/sql/ir.rs b/sbroad/sbroad-core/src/backend/sql/ir.rs
index 39b7ba0d71..c6f0f76b65 100644
--- a/sbroad/sbroad-core/src/backend/sql/ir.rs
+++ b/sbroad/sbroad-core/src/backend/sql/ir.rs
@@ -345,6 +345,15 @@ impl ExecutionPlan {
                                 Some("Plugin are not supported in the generated SQL".into()),
                             ));
                         }
+                        Node::Deallocate(_) => {
+                            return Err(SbroadError::Unsupported(
+                                Entity::Node,
+                                Some(
+                                    "Deallocate nodes are not supported in the generated SQL"
+                                        .into(),
+                                ),
+                            ));
+                        }
                         Node::Relational(rel) => match rel {
                             Relational::Except { .. } => sql.push_str("EXCEPT"),
                             Relational::GroupBy { .. } => sql.push_str("GROUP BY"),
diff --git a/sbroad/sbroad-core/src/backend/sql/tree.rs b/sbroad/sbroad-core/src/backend/sql/tree.rs
index b917fd41f9..06b2706b3c 100644
--- a/sbroad/sbroad-core/src/backend/sql/tree.rs
+++ b/sbroad/sbroad-core/src/backend/sql/tree.rs
@@ -767,7 +767,11 @@ impl<'p> SyntaxPlan<'p> {
             .get_node(id)
             .expect("node {id} must exist in the plan");
         match node {
-            Node::Plugin(..) | Node::Ddl(..) | Node::Acl(..) | Node::Block(..) => {
+            Node::Deallocate(..)
+            | Node::Plugin(..)
+            | Node::Ddl(..)
+            | Node::Acl(..)
+            | Node::Block(..) => {
                 panic!("Node {node:?} is not supported in the syntax plan")
             }
             Node::Invalid(..) | Node::Parameter(..) => {
diff --git a/sbroad/sbroad-core/src/executor.rs b/sbroad/sbroad-core/src/executor.rs
index d344e7b16e..ef9575d34a 100644
--- a/sbroad/sbroad-core/src/executor.rs
+++ b/sbroad/sbroad-core/src/executor.rs
@@ -159,7 +159,11 @@ where
 
         if plan.is_block()? {
             plan.bind_params(params)?;
-        } else if !plan.is_ddl()? && !plan.is_acl()? && !plan.is_plugin()? {
+        } else if !plan.is_ddl()?
+            && !plan.is_acl()?
+            && !plan.is_plugin()?
+            && !plan.is_deallocate()?
+        {
             plan.bind_params(params)?;
             plan.apply_options()?;
             plan.optimize()?;
@@ -398,6 +402,14 @@ where
         self.exec_plan.get_ir_plan().is_plugin()
     }
 
+    /// Checks that query is Deallocate.
+    ///
+    /// # Errors
+    /// - Plan is invalid
+    pub fn is_deallocate(&self) -> Result<bool, SbroadError> {
+        self.exec_plan.get_ir_plan().is_deallocate()
+    }
+
     /// Checks that query is an empty query.
     pub fn is_empty(&self) -> bool {
         self.exec_plan.get_ir_plan().is_empty()
diff --git a/sbroad/sbroad-core/src/executor/ir.rs b/sbroad/sbroad-core/src/executor/ir.rs
index 60e1575e9c..701fe4e416 100644
--- a/sbroad/sbroad-core/src/executor/ir.rs
+++ b/sbroad/sbroad-core/src/executor/ir.rs
@@ -814,6 +814,7 @@ impl ExecutionPlan {
                 | NodeOwned::Ddl { .. }
                 | NodeOwned::Acl { .. }
                 | NodeOwned::Plugin { .. }
+                | NodeOwned::Deallocate { .. }
                 | NodeOwned::Block { .. } => {
                     panic!("Unexpected node in `take_subtree`: {node:?}")
                 }
diff --git a/sbroad/sbroad-core/src/frontend/sql.rs b/sbroad/sbroad-core/src/frontend/sql.rs
index 153210b0a4..281b28b259 100644
--- a/sbroad/sbroad-core/src/frontend/sql.rs
+++ b/sbroad/sbroad-core/src/frontend/sql.rs
@@ -3,6 +3,7 @@
 //! Parses an SQL statement to the abstract syntax tree (AST)
 //! and builds the intermediate representation (IR).
 
+use crate::ir::node::deallocate::Deallocate;
 use crate::ir::node::{Reference, ReferenceAsteriskSource};
 use ahash::{AHashMap, AHashSet};
 use core::panic;
@@ -1030,6 +1031,15 @@ fn parse_set_param(ast: &AbstractSyntaxTree, node: &ParseNode) -> Result<SetPara
     })
 }
 
+fn parse_deallocate(ast: &AbstractSyntaxTree, node: &ParseNode) -> Result<Deallocate, SbroadError> {
+    let param_name = if let Some(identifier_node_id) = node.children.first() {
+        Some(parse_identifier(ast, *identifier_node_id)?)
+    } else {
+        None
+    };
+    Ok(Deallocate { name: param_name })
+}
+
 fn parse_select_full(
     ast: &AbstractSyntaxTree,
     node_id: usize,
@@ -4591,6 +4601,11 @@ impl AbstractSyntaxTree {
                     let plan_id = plan.nodes.push(set_transaction_node.into());
                     map.add(id, plan_id);
                 }
+                Rule::Deallocate => {
+                    let deallocate = parse_deallocate(self, node)?;
+                    let plan_id = plan.nodes.push(deallocate.into());
+                    map.add(id, plan_id);
+                }
                 _ => {}
             }
         }
diff --git a/sbroad/sbroad-core/src/frontend/sql/query.pest b/sbroad/sbroad-core/src/frontend/sql/query.pest
index 5a8eb77e6f..32c1b4eec6 100644
--- a/sbroad/sbroad-core/src/frontend/sql/query.pest
+++ b/sbroad/sbroad-core/src/frontend/sql/query.pest
@@ -1,4 +1,4 @@
-Command = _{ SOI ~ (Query | ExplainQuery | Block | DDL | ACL | Plugin | EmptyQuery) ~ EOF }
+Command = _{ SOI ~ (Query | ExplainQuery | Block | DDL | ACL | Plugin | Deallocate | EmptyQuery) ~ EOF }
 
 // Helper rule to denote we have to update plan relations from metadata
 // (with Table which name corresponds to current node).
@@ -310,6 +310,10 @@ Identifier = @{ DelimitedIdentifier | RegularIdentifier  }
                         | ^"values" | ^"varchar" | ^"when" | ^"where" | ^"with"
                         }
 
+Deallocate = ${ ^"deallocate" ~ (WithPrepare | WithoutPrepare) }
+    WithPrepare = _{ W ~ ^"prepare" ~ W ~ (^"all" | Identifier) }
+    WithoutPrepare = _{ W ~ (^"all" | Identifier) }
+
 EmptyQuery = { WHITESPACE* }
 
 //  `select (true)between(false)and(true)` query is valid!!! :(
diff --git a/sbroad/sbroad-core/src/ir.rs b/sbroad/sbroad-core/src/ir.rs
index 8d4976f4bc..511dd4321a 100644
--- a/sbroad/sbroad-core/src/ir.rs
+++ b/sbroad/sbroad-core/src/ir.rs
@@ -8,6 +8,8 @@ use node::ddl::{Ddl, MutDdl};
 use node::expression::{Expression, MutExpression};
 use node::relational::{MutRelational, Relational};
 use node::{Invalid, NodeAligned};
+use operator::Arithmetic;
+use relation::{Table, Type};
 use serde::{Deserialize, Serialize};
 use smol_str::{format_smolstr, SmolStr, ToSmolStr};
 use std::cell::{RefCell, RefMut};
@@ -16,9 +18,6 @@ use std::fmt::{Display, Formatter};
 use std::slice::Iter;
 use tree::traversal::LevelNode;
 
-use operator::Arithmetic;
-use relation::{Table, Type};
-
 use self::parameters::Parameters;
 use self::relation::Relations;
 use self::transformation::redistribution::MotionPolicy;
@@ -104,6 +103,7 @@ impl Nodes {
                 Node32::Union(un) => Node::Relational(Relational::Union(un)),
                 Node32::UnionAll(union_all) => Node::Relational(Relational::UnionAll(union_all)),
                 Node32::Values(values) => Node::Relational(Relational::Values(values)),
+                Node32::Deallocate(deallocate) => Node::Deallocate(deallocate),
             }),
             ArenaType::Arena64 => self.arena64.get(id.offset as usize).map(|node| match node {
                 Node64::Case(case) => Node::Expression(Expression::Case(case)),
@@ -229,6 +229,7 @@ impl Nodes {
                         MutNode::Relational(MutRelational::SelectWithoutScan(select))
                     }
                     Node32::Values(values) => MutNode::Relational(MutRelational::Values(values)),
+                    Node32::Deallocate(deallocate) => MutNode::Deallocate(deallocate),
                 }),
             ArenaType::Arena64 => self
                 .arena64
@@ -1311,6 +1312,15 @@ impl Plan {
         Ok(matches!(self.get_node(top_id)?, Node::Plugin(_)))
     }
 
+    /// Checks that plan is a deallocate query.
+    ///
+    /// # Errors
+    /// - top node doesn't exist in the plan or is invalid.
+    pub fn is_deallocate(&self) -> Result<bool, SbroadError> {
+        let top_id = self.get_top()?;
+        Ok(matches!(self.get_node(top_id)?, Node::Deallocate(_)))
+    }
+
     /// Set top node of plan
     /// # Errors
     /// - top node doesn't exist in the plan.
@@ -1335,7 +1345,8 @@ impl Plan {
             | Node::Invalid(..)
             | Node::Acl(..)
             | Node::Block(..)
-            | Node::Plugin(..) => Err(SbroadError::Invalid(
+            | Node::Plugin(..)
+            | Node::Deallocate(..) => Err(SbroadError::Invalid(
                 Entity::Node,
                 Some(format_smolstr!("node is not Relational type: {node:?}")),
             )),
@@ -1356,6 +1367,7 @@ impl Plan {
             | MutNode::Invalid(..)
             | MutNode::Acl(..)
             | MutNode::Plugin(..)
+            | MutNode::Deallocate(..)
             | MutNode::Block(..) => Err(SbroadError::Invalid(
                 Entity::Node,
                 Some("Node is not relational".into()),
@@ -1415,6 +1427,7 @@ impl Plan {
             | MutNode::Invalid(..)
             | MutNode::Acl(..)
             | MutNode::Plugin(..)
+            | MutNode::Deallocate(..)
             | MutNode::Block(..) => Err(SbroadError::Invalid(
                 Entity::Node,
                 Some(format_smolstr!(
diff --git a/sbroad/sbroad-core/src/ir/api/parameter.rs b/sbroad/sbroad-core/src/ir/api/parameter.rs
index 374b2be38d..ed3d608cc0 100644
--- a/sbroad/sbroad-core/src/ir/api/parameter.rs
+++ b/sbroad/sbroad-core/src/ir/api/parameter.rs
@@ -371,7 +371,8 @@ impl<'binder> ParamsBinder<'binder> {
                 | Node::Parameter(..)
                 | Node::Ddl(..)
                 | Node::Acl(..)
-                | Node::Plugin(_) => {}
+                | Node::Plugin(_)
+                | Node::Deallocate(..) => {}
             }
         }
 
@@ -557,7 +558,8 @@ impl<'binder> ParamsBinder<'binder> {
                 | MutNode::Parameter(..)
                 | MutNode::Ddl(..)
                 | MutNode::Plugin(_)
-                | MutNode::Acl(..) => {}
+                | MutNode::Acl(..)
+                | MutNode::Deallocate(..) => {}
             }
         }
 
diff --git a/sbroad/sbroad-core/src/ir/block.rs b/sbroad/sbroad-core/src/ir/block.rs
index 1891ac1752..a6ffa29aec 100644
--- a/sbroad/sbroad-core/src/ir/block.rs
+++ b/sbroad/sbroad-core/src/ir/block.rs
@@ -22,6 +22,7 @@ impl Plan {
             | Node::Acl(..)
             | Node::Invalid(..)
             | Node::Plugin(_)
+            | Node::Deallocate(..)
             | Node::Parameter(..) => Err(SbroadError::Invalid(
                 Entity::Node,
                 Some(format_smolstr!(
@@ -45,6 +46,7 @@ impl Plan {
             | MutNode::Acl(..)
             | MutNode::Invalid(..)
             | MutNode::Plugin(_)
+            | MutNode::Deallocate(..)
             | MutNode::Parameter(..) => Err(SbroadError::Invalid(
                 Entity::Node,
                 Some(format_smolstr!(
diff --git a/sbroad/sbroad-core/src/ir/expression/types.rs b/sbroad/sbroad-core/src/ir/expression/types.rs
index 1a1b73bda9..cb7d84e8da 100644
--- a/sbroad/sbroad-core/src/ir/expression/types.rs
+++ b/sbroad/sbroad-core/src/ir/expression/types.rs
@@ -43,6 +43,10 @@ impl Plan {
                 Entity::Node,
                 Some("Plugin node has no type".to_smolstr()),
             )),
+            Node::Deallocate(_) => Err(SbroadError::Invalid(
+                Entity::Node,
+                Some("Deallocate node has no type".to_smolstr()),
+            )),
         }
     }
 }
diff --git a/sbroad/sbroad-core/src/ir/node.rs b/sbroad/sbroad-core/src/ir/node.rs
index b136d4c916..26d2af47a6 100644
--- a/sbroad/sbroad-core/src/ir/node.rs
+++ b/sbroad/sbroad-core/src/ir/node.rs
@@ -3,6 +3,7 @@ use std::{collections::HashMap, fmt::Display};
 use acl::{Acl, AclOwned, MutAcl};
 use block::{Block, BlockOwned, MutBlock};
 use ddl::{Ddl, DdlOwned, MutDdl};
+use deallocate::Deallocate;
 use expression::{ExprOwned, Expression, MutExpression};
 use relational::{MutRelational, RelOwned, Relational};
 use serde::{Deserialize, Serialize};
@@ -35,6 +36,7 @@ use plugin::{
 pub mod acl;
 pub mod block;
 pub mod ddl;
+pub mod deallocate;
 pub mod expression;
 pub mod plugin;
 pub mod relational;
@@ -1061,6 +1063,7 @@ pub enum Node32 {
     SelectWithoutScan(SelectWithoutScan),
     UnionAll(UnionAll),
     Values(Values),
+    Deallocate(Deallocate),
 }
 
 impl Node32 {
@@ -1089,6 +1092,7 @@ impl Node32 {
             Node32::Union(un) => NodeOwned::Relational(RelOwned::Union(un)),
             Node32::UnionAll(union_all) => NodeOwned::Relational(RelOwned::UnionAll(union_all)),
             Node32::Values(values) => NodeOwned::Relational(RelOwned::Values(values)),
+            Node32::Deallocate(deallocate) => NodeOwned::Deallocate(deallocate),
         }
     }
 }
@@ -1321,6 +1325,7 @@ pub enum Node<'nodes> {
     Parameter(&'nodes Parameter),
     Invalid(&'nodes Invalid),
     Plugin(Plugin<'nodes>),
+    Deallocate(&'nodes Deallocate),
 }
 
 #[allow(clippy::module_name_repetitions)]
@@ -1334,6 +1339,7 @@ pub enum MutNode<'nodes> {
     Parameter(&'nodes mut Parameter),
     Invalid(&'nodes mut Invalid),
     Plugin(MutPlugin<'nodes>),
+    Deallocate(&'nodes mut Deallocate),
 }
 
 impl Node<'_> {
@@ -1348,6 +1354,7 @@ impl Node<'_> {
             Node::Parameter(param) => NodeOwned::Parameter((*param).clone()),
             Node::Invalid(inv) => NodeOwned::Invalid((*inv).clone()),
             Node::Plugin(plugin) => NodeOwned::Plugin(plugin.get_plugin_owned()),
+            Node::Deallocate(deallocate) => NodeOwned::Deallocate((*deallocate).clone()),
         }
     }
 }
@@ -1364,6 +1371,7 @@ pub enum NodeOwned {
     Parameter(Parameter),
     Invalid(Invalid),
     Plugin(PluginOwned),
+    Deallocate(Deallocate),
 }
 
 impl From<NodeOwned> for NodeAligned {
@@ -1377,6 +1385,7 @@ impl From<NodeOwned> for NodeAligned {
             NodeOwned::Parameter(param) => param.into(),
             NodeOwned::Relational(rel) => rel.into(),
             NodeOwned::Plugin(p) => p.into(),
+            NodeOwned::Deallocate(d) => d.into(),
         }
     }
 }
diff --git a/sbroad/sbroad-core/src/ir/node/deallocate.rs b/sbroad/sbroad-core/src/ir/node/deallocate.rs
new file mode 100644
index 0000000000..fb691dddfa
--- /dev/null
+++ b/sbroad/sbroad-core/src/ir/node/deallocate.rs
@@ -0,0 +1,42 @@
+use crate::errors::{Entity, SbroadError};
+use crate::ir::node::NodeAligned;
+use crate::ir::{Node, NodeId, Plan};
+use serde::{Deserialize, Serialize};
+use smol_str::{format_smolstr, SmolStr};
+
+use super::{Node32, NodeOwned};
+
+#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
+pub struct Deallocate {
+    pub name: Option<SmolStr>,
+}
+
+impl From<Deallocate> for NodeAligned {
+    fn from(deallocate: Deallocate) -> Self {
+        Self::Node32(Node32::Deallocate(deallocate))
+    }
+}
+
+impl From<Deallocate> for NodeOwned {
+    fn from(deallocate: Deallocate) -> Self {
+        NodeOwned::Deallocate(deallocate)
+    }
+}
+
+impl Plan {
+    /// Get Deallocate node from the plan arena.
+    ///
+    /// # Errors
+    /// - the node index is absent in arena
+    /// - current node is not of Deallocate type
+    pub fn get_deallocate_node(&self, node_id: NodeId) -> Result<Deallocate, SbroadError> {
+        let node = self.get_node(node_id)?;
+        match node {
+            Node::Deallocate(deallocate) => Ok(deallocate.clone()),
+            _ => Err(SbroadError::Invalid(
+                Entity::Node,
+                Some(format_smolstr!("node is not Deallocate type: {node:?}")),
+            )),
+        }
+    }
+}
diff --git a/sbroad/sbroad-core/src/ir/tree/expression.rs b/sbroad/sbroad-core/src/ir/tree/expression.rs
index 6290ae7a85..2cf20ae2ab 100644
--- a/sbroad/sbroad-core/src/ir/tree/expression.rs
+++ b/sbroad/sbroad-core/src/ir/tree/expression.rs
@@ -174,6 +174,7 @@ fn expression_next<'nodes>(iter: &mut impl ExpressionTreeIterator<'nodes>) -> Op
                 | Node::Relational(_)
                 | Node::Invalid(_)
                 | Node::Plugin(_)
+                | Node::Deallocate(_)
                 | Node::Parameter(_) => None,
             }
         }
diff --git a/sbroad/sbroad-core/src/ir/tree/relation.rs b/sbroad/sbroad-core/src/ir/tree/relation.rs
index a436f961e6..a269b13506 100644
--- a/sbroad/sbroad-core/src/ir/tree/relation.rs
+++ b/sbroad/sbroad-core/src/ir/tree/relation.rs
@@ -109,7 +109,8 @@ fn relational_next<'nodes>(iter: &mut impl RelationalTreeIterator<'nodes>) -> Op
             | Node::Ddl(_)
             | Node::Acl(_)
             | Node::Block(_)
-            | Node::Plugin(_) => None,
+            | Node::Plugin(_)
+            | Node::Deallocate(_) => None,
         },
         None => None,
     }
diff --git a/sbroad/sbroad-core/src/ir/tree/subtree.rs b/sbroad/sbroad-core/src/ir/tree/subtree.rs
index 0495d3edc1..fc52d2e39c 100644
--- a/sbroad/sbroad-core/src/ir/tree/subtree.rs
+++ b/sbroad/sbroad-core/src/ir/tree/subtree.rs
@@ -206,7 +206,8 @@ fn subtree_next<'plan>(
             | Node::Ddl(..)
             | Node::Acl(..)
             | Node::Block(..)
-            | Node::Plugin(..) => None,
+            | Node::Plugin(..)
+            | Node::Deallocate(..) => None,
             Node::Expression(expr) => match expr {
                 Expression::Alias { .. }
                 | Expression::ExprInParentheses { .. }
diff --git a/src/pgproto/backend.rs b/src/pgproto/backend.rs
index 671201d78c..1270e80776 100644
--- a/src/pgproto/backend.rs
+++ b/src/pgproto/backend.rs
@@ -137,12 +137,17 @@ pub fn bind(
         plan.raw_options = apply_default_options(&plan.raw_options, &default_options);
     }
 
-    if !plan.is_empty() && !plan.is_ddl()? && !plan.is_acl()? && !plan.is_plugin()? {
+    if !plan.is_empty()
+        && !plan.is_ddl()?
+        && !plan.is_acl()?
+        && !plan.is_plugin()?
+        && !plan.is_deallocate()?
+    {
         plan.bind_params(params)?;
         plan.apply_options()?;
         plan.optimize()?;
     }
-    let portal = Portal::new(plan, statement.clone(), result_format)?;
+    let portal = Portal::new(plan, statement.clone(), result_format, client_id)?;
 
     PG_PORTALS.with(|storage| {
         storage
@@ -225,6 +230,19 @@ pub fn close_client_portals(id: ClientId) {
     PG_PORTALS.with(|storage| storage.borrow_mut().remove_by_client_id(id))
 }
 
+pub fn deallocate_statement(id: ClientId, name: &str) -> PgResult<()> {
+    // In contrast to closing, deallocation can cause an error in PG.
+    PG_STATEMENTS.with(|storage| {
+        storage
+            .borrow_mut()
+            .remove(&(id, name.into()))
+            .ok_or_else(|| {
+                PgError::Other(format!("prepared statement {} does not exist.", name).into())
+            })
+    })?;
+    Ok(())
+}
+
 /// Each postgres client uses its own backend to handle incoming messages.
 pub struct Backend {
     /// A unique identificator of a postgres client. It is used as a part of a key in the portal
diff --git a/src/pgproto/backend/describe.rs b/src/pgproto/backend/describe.rs
index 522ce5e2be..29231524c3 100644
--- a/src/pgproto/backend/describe.rs
+++ b/src/pgproto/backend/describe.rs
@@ -37,6 +37,7 @@ pub enum QueryType {
     Dql = 3,
     Explain = 4,
     Empty = 5,
+    Deallocate = 7,
 }
 
 #[derive(Clone, Debug, Default, Deserialize_repr, Serialize_repr)]
@@ -55,6 +56,8 @@ pub enum CommandTag {
     DropProcedure = 15,
     DropRole = 3,
     DropTable = 4,
+    Deallocate = 48,
+    DeallocateAll = 49,
     Delete = 5,
     DisablePlugin = 33,
     DropIndex = 19,
@@ -87,6 +90,8 @@ impl CommandTag {
             Self::CreateRole => "CREATE ROLE",
             Self::CreateTable => "CREATE TABLE",
             Self::CreateIndex => "CREATE INDEX",
+            Self::Deallocate => "DEALLOCATE",
+            Self::DeallocateAll => "DEALLOCATE ALL",
             Self::DropRole => "DROP ROLE",
             Self::DropTable => "DROP TABLE",
             Self::DropIndex => "DROP INDEX",
@@ -159,6 +164,7 @@ impl From<CommandTag> for QueryType {
             | CommandTag::CallProcedure => QueryType::Dml,
             CommandTag::Explain => QueryType::Explain,
             CommandTag::Select => QueryType::Dql,
+            CommandTag::Deallocate | CommandTag::DeallocateAll => QueryType::Deallocate,
             CommandTag::EmptyQuery => QueryType::Empty,
         }
     }
@@ -207,6 +213,12 @@ impl TryFrom<&Node<'_>> for CommandTag {
                 Plugin::RemoveServiceFromTier { .. } => Ok(CommandTag::RemoveTier),
                 Plugin::ChangeConfig { .. } => Ok(CommandTag::ChangeConfig),
             },
+
+            Node::Deallocate(deallocate) => match deallocate.name {
+                Some(_) => Ok(CommandTag::Deallocate),
+                None => Ok(CommandTag::DeallocateAll),
+            },
+
             Node::Relational(rel) => match rel {
                 Relational::Delete { .. } => Ok(CommandTag::Delete),
                 Relational::Insert { .. } => Ok(CommandTag::Insert),
@@ -379,7 +391,7 @@ impl Describe {
         };
         let query_type = command_tag.clone().into();
         match query_type {
-            QueryType::Acl | QueryType::Ddl | QueryType::Dml => {
+            QueryType::Acl | QueryType::Ddl | QueryType::Dml | QueryType::Deallocate => {
                 Ok(Describe::default().with_command_tag(command_tag))
             }
             QueryType::Dql => Ok(Describe::default()
@@ -404,7 +416,11 @@ impl Describe {
 
     pub fn row_description(&self) -> Option<RowDescription> {
         match self.query_type() {
-            QueryType::Acl | QueryType::Ddl | QueryType::Dml | QueryType::Empty => None,
+            QueryType::Acl
+            | QueryType::Ddl
+            | QueryType::Dml
+            | QueryType::Deallocate
+            | QueryType::Empty => None,
             QueryType::Dql | QueryType::Explain => {
                 let row_description = self
                     .metadata
@@ -487,7 +503,11 @@ impl PortalDescribe {
 impl PortalDescribe {
     pub fn row_description(&self) -> Option<RowDescription> {
         match self.query_type() {
-            QueryType::Acl | QueryType::Ddl | QueryType::Dml | QueryType::Empty => None,
+            QueryType::Acl
+            | QueryType::Ddl
+            | QueryType::Dml
+            | QueryType::Deallocate
+            | QueryType::Empty => None,
             QueryType::Dql | QueryType::Explain => {
                 let metadata = &self.describe.metadata;
                 let output_format = &self.output_format;
diff --git a/src/pgproto/backend/storage.rs b/src/pgproto/backend/storage.rs
index 19c523a908..0b81c07a34 100644
--- a/src/pgproto/backend/storage.rs
+++ b/src/pgproto/backend/storage.rs
@@ -1,4 +1,5 @@
 use super::{
+    close_client_statements, deallocate_statement,
     describe::{Describe, MetadataColumn, PortalDescribe, QueryType, StatementDescribe},
     result::{ExecuteResult, Rows},
 };
@@ -379,6 +380,7 @@ pub struct Portal {
     statement: Statement,
     describe: PortalDescribe,
     state: PortalState,
+    id: ClientId,
 }
 
 #[derive(Debug, Default)]
@@ -438,6 +440,7 @@ impl Portal {
         plan: Plan,
         statement: Statement,
         output_format: Vec<FieldFormat>,
+        id: ClientId,
     ) -> PgResult<Self> {
         let stmt_describe = statement.describe();
         let describe = PortalDescribe::new(stmt_describe.describe.clone(), output_format);
@@ -446,6 +449,7 @@ impl Portal {
             statement,
             describe,
             state: PortalState::NotStarted,
+            id,
         })
     }
 
@@ -511,6 +515,18 @@ impl Portal {
                     .collect::<PgResult<Vec<Vec<_>>>>()?;
                 PortalState::StreamingRows(pg_rows.into_iter())
             }
+            QueryType::Deallocate => {
+                let tag = self.describe().command_tag();
+                let ir_plan = self.statement().plan();
+                let top_id = ir_plan.get_top()?;
+                let deallocate = ir_plan.get_deallocate_node(top_id)?;
+                let name = deallocate.name.as_ref().map(|name| name.as_str());
+                match name {
+                    Some(name) => deallocate_statement(self.id, name)?,
+                    None => close_client_statements(self.id),
+                };
+                PortalState::ResultReady(ExecuteResult::AclOrDdl { tag })
+            }
             QueryType::Empty => PortalState::ResultReady(ExecuteResult::Empty),
         };
         Ok(())
diff --git a/src/sql.rs b/src/sql.rs
index 2556570048..68d6cbbcf4 100644
--- a/src/sql.rs
+++ b/src/sql.rs
@@ -218,6 +218,10 @@ pub fn dispatch(mut query: Query<RouterRuntime>) -> traft::Result<Tuple> {
         return empty_query_response();
     }
 
+    if query.is_deallocate()? {
+        return empty_query_response();
+    }
+
     if query.is_ddl()? || query.is_acl()? {
         let ir_plan = query.get_exec_plan().get_ir_plan();
         let top_id = ir_plan.get_top()?;
diff --git a/test/pgproto/extended_query_test.py b/test/pgproto/extended_query_test.py
index c4aa709305..483a14591e 100644
--- a/test/pgproto/extended_query_test.py
+++ b/test/pgproto/extended_query_test.py
@@ -282,3 +282,75 @@ def test_empty_queries(postgres: Postgres):
     cur = conn.execute(" ; ", prepare=True)
     assert cur.pgresult is not None
     assert cur.pgresult.status == ExecStatus.EMPTY_QUERY
+
+
+def test_deallocate(postgres: Postgres):
+    user = "admin"
+    password = "P@ssw0rd"
+    postgres.instance.sql(f"ALTER USER \"{user}\" WITH PASSWORD '{password}'")
+
+    os.environ["PGSSLMODE"] = "disable"
+    conn = pg.Connection(
+        user, password=password, host=postgres.host, port=postgres.port
+    )
+    conn.autocommit = True
+
+    # Remove unprepared statement
+    statement_name = "not_existing_name"
+    ps = conn.prepare(f"DEALLOCATE {statement_name}")
+    with pytest.raises(
+        DatabaseError, match=f"prepared statement {statement_name} does not exist."
+    ):
+        ps.run()
+
+    # Remove statement with .close()
+    ps = conn.prepare("SELECT 1")
+    ps.run()
+
+    ps.close()
+
+    # run a closed statement
+    with pytest.raises(DatabaseError, match="Couldn't find statement"):
+        ps.run()
+
+    # Remove statement with DEALLOCATE statement_name
+    ps = conn.prepare("SELECT 1")
+    ps.run()
+
+    # Decode the binary name to a string
+    statement_name = ps.name_bin.decode("utf-8").strip("\x00")
+    # Use the decoded name in the DEALLOCATE statement
+    ps_deallocate = conn.prepare(f"DEALLOCATE {statement_name}")
+
+    # check that prepare ps_deallocate doesn't actually deallocate statement
+    ps.run()
+
+    ps_deallocate.run()
+
+    # run a closed statement
+    with pytest.raises(DatabaseError, match="Couldn't find statement"):
+        ps.run()
+
+    # Remove statements with DEALLOCATE ALL
+    ps1 = conn.prepare("SELECT 1")
+    ps1.run()
+
+    ps2 = conn.prepare("SELECT 1")
+    ps2.run()
+
+    ps_deallocate = conn.prepare("DEALLOCATE ALL")
+
+    ps1.run()
+    ps2.run()
+
+    ps_deallocate.run()
+
+    # run a closed statement1
+    with pytest.raises(DatabaseError, match="Couldn't find statement"):
+        ps1.run()
+
+    # run a closed statement2
+    with pytest.raises(DatabaseError, match="Couldn't find statement"):
+        ps2.run()
+
+    conn.close()
diff --git a/test/pgproto/simple_query_test.py b/test/pgproto/simple_query_test.py
index d083b0b7ef..c547c4ea46 100644
--- a/test/pgproto/simple_query_test.py
+++ b/test/pgproto/simple_query_test.py
@@ -5,6 +5,8 @@ from conftest import Postgres
 from decimal import Decimal
 import psycopg
 from psycopg.pq import ExecStatus
+import pg8000.native as pg2  # type: ignore
+from pg8000.exceptions import DatabaseError  # type: ignore
 
 
 def test_simple_query_flow_errors(postgres: Postgres):
@@ -224,3 +226,64 @@ def test_empty_queries(postgres: Postgres):
     cur = conn.execute(" ; ", prepare=False)
     assert cur.pgresult is not None
     assert cur.pgresult.status == ExecStatus.EMPTY_QUERY
+
+
+def test_deallocate(postgres: Postgres):
+    user = "admin"
+    password = "P@ssw0rd"
+    postgres.instance.sql(f"ALTER USER \"{user}\" WITH PASSWORD '{password}'")
+
+    os.environ["PGSSLMODE"] = "disable"
+    conn = pg2.Connection(
+        user, password=password, host=postgres.host, port=postgres.port
+    )
+    conn.autocommit = True
+
+    # Remove unprepared statement
+    statement_name = "not_existing_name"
+    with pytest.raises(
+        DatabaseError, match=f"prepared statement {statement_name} does not exist."
+    ):
+        conn.run(f"DEALLOCATE {statement_name}")
+
+    # Remove statement with .close()
+    ps = conn.prepare("SELECT 1")
+    ps.run()
+
+    ps.close()
+
+    # run a closed statement
+    with pytest.raises(DatabaseError, match="Couldn't find statement"):
+        ps.run()
+
+    # Remove statement with DEALLOCATE statement_name
+    ps = conn.prepare("SELECT 1")
+    ps.run()
+
+    # Decode the binary name to a string
+    statement_name = ps.name_bin.decode("utf-8").strip("\x00")
+    # Use the decoded name in the DEALLOCATE statement
+    conn.run(f"DEALLOCATE {statement_name}")
+
+    # run a closed statement
+    with pytest.raises(DatabaseError, match="Couldn't find statement"):
+        ps.run()
+
+    # Remove statements with DEALLOCATE ALL
+    ps1 = conn.prepare("SELECT 1")
+    ps1.run()
+
+    ps2 = conn.prepare("SELECT 1")
+    ps2.run()
+
+    conn.run("DEALLOCATE ALL")
+
+    # run a closed statement1
+    with pytest.raises(DatabaseError, match="Couldn't find statement"):
+        ps1.run()
+
+    # run a closed statement2
+    with pytest.raises(DatabaseError, match="Couldn't find statement"):
+        ps2.run()
+
+    conn.close()
-- 
GitLab