From 2e512c5f708055d253fb94b0e975a4dfabbbcfb3 Mon Sep 17 00:00:00 2001
From: Erik Khamitov <e.khamitov@picodata.io>
Date: Tue, 17 Dec 2024 16:07:59 +0300
Subject: [PATCH] feat(sql): mock DDL create schema, drop schema

---
 .../test_app/test/integration/ddl_test.lua    | 30 ++++++++++
 sbroad/sbroad-core/src/frontend/sql.rs        | 11 ++++
 .../sbroad-core/src/frontend/sql/query.pest   |  7 ++-
 sbroad/sbroad-core/src/ir.rs                  |  4 ++
 sbroad/sbroad-core/src/ir/node.rs             |  4 ++
 sbroad/sbroad-core/src/ir/node/ddl.rs         | 55 +++++++++++--------
 src/pgproto/backend/describe.rs               |  8 +++
 src/sql.rs                                    | 27 ++++++++-
 test/pgproto/extended_query_test.py           | 36 ++++++++++++
 test/pgproto/simple_query_test.py             | 36 ++++++++++++
 10 files changed, 193 insertions(+), 25 deletions(-)

diff --git a/sbroad/sbroad-cartridge/test_app/test/integration/ddl_test.lua b/sbroad/sbroad-cartridge/test_app/test/integration/ddl_test.lua
index 4a474ad33d..efb4bf5e4c 100644
--- a/sbroad/sbroad-cartridge/test_app/test/integration/ddl_test.lua
+++ b/sbroad/sbroad-cartridge/test_app/test/integration/ddl_test.lua
@@ -231,3 +231,33 @@ g.test_create_table = function()
         [[Sbroad Error: DDL queries are not supported]]
     )
 end
+
+g.test_create_schema = function()
+    local api = cluster:server("api-1").net_box
+
+    local _, err = api:call(
+        "sbroad.execute",
+        { [[
+            CREATE SCHEMA test_schema
+        ]], {} }
+    )
+    t.assert_equals(
+        string.format("%s", err),
+        [[Sbroad Error: DDL queries are not supported]]
+    )
+end
+
+g.test_drop_schema = function()
+    local api = cluster:server("api-1").net_box
+
+    local _, err = api:call(
+        "sbroad.execute",
+        { [[
+            DROP SCHEMA test_schema
+        ]], {} }
+    )
+    t.assert_equals(
+        string.format("%s", err),
+        [[Sbroad Error: DDL queries are not supported]]
+    )
+end
\ No newline at end of file
diff --git a/sbroad/sbroad-core/src/frontend/sql.rs b/sbroad/sbroad-core/src/frontend/sql.rs
index 2d9edabbaf..f063cda6b8 100644
--- a/sbroad/sbroad-core/src/frontend/sql.rs
+++ b/sbroad/sbroad-core/src/frontend/sql.rs
@@ -3,6 +3,7 @@
 //! Parses an SQL statement to the abstract syntax tree (AST)
 //! and builds the intermediate representation (IR).
 
+use crate::ir::node::ddl::DdlOwned;
 use crate::ir::node::deallocate::Deallocate;
 use crate::ir::node::tcl::Tcl;
 use crate::ir::node::{Reference, ReferenceAsteriskSource};
@@ -4301,6 +4302,11 @@ impl AbstractSyntaxTree {
                     let plan_id = plan.nodes.push(create_index.into());
                     map.add(id, plan_id);
                 }
+                Rule::CreateSchema => {
+                    let create_schema = DdlOwned::CreateSchema;
+                    let plan_id = plan.nodes.push(create_schema.into());
+                    map.add(id, plan_id);
+                }
                 Rule::AlterSystem => {
                     let alter_system =
                         parse_alter_system(self, node, pairs_map, &mut worker, &mut plan)?;
@@ -4348,6 +4354,11 @@ impl AbstractSyntaxTree {
                     let plan_id = plan.nodes.push(drop_index.into());
                     map.add(id, plan_id);
                 }
+                Rule::DropSchema => {
+                    let drop_schema = DdlOwned::DropSchema;
+                    let plan_id = plan.nodes.push(drop_schema.into());
+                    map.add(id, plan_id);
+                }
                 Rule::DropRole => {
                     let mut name = None;
                     let mut timeout = get_default_timeout();
diff --git a/sbroad/sbroad-core/src/frontend/sql/query.pest b/sbroad/sbroad-core/src/frontend/sql/query.pest
index c443a57273..5b6554b431 100644
--- a/sbroad/sbroad-core/src/frontend/sql/query.pest
+++ b/sbroad/sbroad-core/src/frontend/sql/query.pest
@@ -78,7 +78,7 @@ ACL = _{ DropRole | DropUser | CreateRole | CreateUser | AlterUser | GrantPrivil
             PrivilegeUsage = { ^"usage" }
             PrivilegeWrite = { ^"write" }
 
-DDL = _{ CreateTable | DropTable | CreateIndex | DropIndex
+DDL = _{ CreateTable | DropTable | CreateIndex | DropIndex | CreateSchema | DropSchema
          | CreateProc | DropProc | RenameProc | SetParam | SetTransaction | AlterSystem
          | CreatePartition }
     CreatePartition = ${
@@ -172,6 +172,11 @@ DDL = _{ CreateTable | DropTable | CreateIndex | DropIndex
         Hint = !{ ^"hint" ~ "=" ~ (True | False) }
     DropIndex = ${ ^"drop" ~ W ~ ^"index" ~ W ~ (IfExists ~ W)? ~ Identifier ~ (W ~ WaitApplied)? ~ (W ~ TimeoutOption)? }
 
+    CreateSchema = ${ ^"create" ~ W ~ ^"schema" ~ W ~ (IfNotExists ~ W)? ~ Identifier }
+    DropSchema = ${ ^"drop" ~ W ~ ^"schema" ~ W ~ (IfExists ~ W)? ~ Identifier ~ (W ~ (Cascade | Restrict))? }
+        Cascade = _{ ^"cascade" }
+        Restrict = _{ ^"restrict" }
+
     SetParam = ${ ^"set" ~ W ~ (SetScope ~ W)? ~ ConfParam  }
         SetScope = { ScopeSession | ScopeLocal }
             ScopeSession  = { ^"session" }
diff --git a/sbroad/sbroad-core/src/ir.rs b/sbroad/sbroad-core/src/ir.rs
index 4b804ed9e6..130d84934e 100644
--- a/sbroad/sbroad-core/src/ir.rs
+++ b/sbroad/sbroad-core/src/ir.rs
@@ -110,6 +110,8 @@ impl Nodes {
                     Tcl::Commit => Node::Tcl(Tcl::Commit),
                     Tcl::Rollback => Node::Tcl(Tcl::Rollback),
                 },
+                Node32::CreateSchema => Node::Ddl(Ddl::CreateSchema),
+                Node32::DropSchema => Node::Ddl(Ddl::DropSchema),
             }),
             ArenaType::Arena64 => self.arena64.get(id.offset as usize).map(|node| match node {
                 Node64::Case(case) => Node::Expression(Expression::Case(case)),
@@ -241,6 +243,8 @@ impl Nodes {
                         Tcl::Commit => MutNode::Tcl(node::tcl::Tcl::Commit),
                         Tcl::Rollback => MutNode::Tcl(node::tcl::Tcl::Rollback),
                     },
+                    Node32::CreateSchema => MutNode::Ddl(MutDdl::CreateSchema),
+                    Node32::DropSchema => MutNode::Ddl(MutDdl::DropSchema),
                 }),
             ArenaType::Arena64 => self
                 .arena64
diff --git a/sbroad/sbroad-core/src/ir/node.rs b/sbroad/sbroad-core/src/ir/node.rs
index 8929fa9f60..a228fae44c 100644
--- a/sbroad/sbroad-core/src/ir/node.rs
+++ b/sbroad/sbroad-core/src/ir/node.rs
@@ -1067,6 +1067,8 @@ pub enum Node32 {
     Values(Values),
     Deallocate(Deallocate),
     Tcl(Tcl),
+    CreateSchema,
+    DropSchema,
 }
 
 impl Node32 {
@@ -1101,6 +1103,8 @@ impl Node32 {
                 Tcl::Commit => NodeOwned::Tcl(Tcl::Commit),
                 Tcl::Rollback => NodeOwned::Tcl(Tcl::Rollback),
             },
+            Node32::CreateSchema => NodeOwned::Ddl(DdlOwned::CreateSchema),
+            Node32::DropSchema => NodeOwned::Ddl(DdlOwned::DropSchema),
         }
     }
 }
diff --git a/sbroad/sbroad-core/src/ir/node/ddl.rs b/sbroad/sbroad-core/src/ir/node/ddl.rs
index 85c5c0a86d..65926d463c 100644
--- a/sbroad/sbroad-core/src/ir/node/ddl.rs
+++ b/sbroad/sbroad-core/src/ir/node/ddl.rs
@@ -1,12 +1,11 @@
-use serde::Serialize;
-use smol_str::{format_smolstr, ToSmolStr};
-
-use crate::errors::{Entity, SbroadError};
-
 use super::{
     AlterSystem, CreateIndex, CreateProc, CreateTable, DropIndex, DropProc, DropTable, NodeAligned,
     RenameRoutine, SetParam, SetTransaction,
 };
+use crate::errors::{Entity, SbroadError};
+use crate::ir::Node32;
+use serde::Serialize;
+use smol_str::{format_smolstr, ToSmolStr};
 
 #[allow(clippy::module_name_repetitions)]
 #[derive(Clone, Debug, PartialEq, Eq, Serialize)]
@@ -19,6 +18,8 @@ pub enum DdlOwned {
     AlterSystem(AlterSystem),
     CreateIndex(CreateIndex),
     DropIndex(DropIndex),
+    CreateSchema,
+    DropSchema,
     SetParam(SetParam),
     SetTransaction(SetTransaction),
 }
@@ -39,16 +40,16 @@ impl DdlOwned {
             | DdlOwned::AlterSystem(AlterSystem { ref timeout, .. })
             | DdlOwned::CreateProc(CreateProc { ref timeout, .. })
             | DdlOwned::DropProc(DropProc { ref timeout, .. })
-            | DdlOwned::RenameRoutine(RenameRoutine { ref timeout, .. }) => timeout,
+            | DdlOwned::RenameRoutine(RenameRoutine { ref timeout, .. }) => {
+                timeout.to_smolstr().parse().map_err(|e| {
+                    SbroadError::Invalid(
+                        Entity::SpaceMetadata,
+                        Some(format_smolstr!("timeout parsing error {e:?}")),
+                    )
+                })
+            }
+            DdlOwned::CreateSchema | DdlOwned::DropSchema => Ok(0.0),
         }
-        .to_smolstr()
-        .parse()
-        .map_err(|e| {
-            SbroadError::Invalid(
-                Entity::SpaceMetadata,
-                Some(format_smolstr!("timeout parsing error {e:?}")),
-            )
-        })
     }
 
     pub fn wait_applied_globally(&self) -> bool {
@@ -92,9 +93,11 @@ impl From<DdlOwned> for NodeAligned {
             DdlOwned::CreateIndex(create_index) => create_index.into(),
             DdlOwned::CreateProc(create_proc) => create_proc.into(),
             DdlOwned::CreateTable(create_table) => create_table.into(),
+            DdlOwned::CreateSchema => Self::Node32(Node32::CreateSchema),
             DdlOwned::DropIndex(drop_index) => drop_index.into(),
             DdlOwned::DropProc(drop_proc) => drop_proc.into(),
             DdlOwned::DropTable(drop_table) => drop_table.into(),
+            DdlOwned::DropSchema => Self::Node32(Node32::DropSchema),
             DdlOwned::AlterSystem(alter_system) => alter_system.into(),
             DdlOwned::RenameRoutine(rename) => rename.into(),
             DdlOwned::SetParam(set_param) => set_param.into(),
@@ -114,6 +117,8 @@ pub enum MutDdl<'a> {
     AlterSystem(&'a mut AlterSystem),
     CreateIndex(&'a mut CreateIndex),
     DropIndex(&'a mut DropIndex),
+    CreateSchema,
+    DropSchema,
     SetParam(&'a mut SetParam),
     SetTransaction(&'a mut SetTransaction),
 }
@@ -129,6 +134,8 @@ pub enum Ddl<'a> {
     AlterSystem(&'a AlterSystem),
     CreateIndex(&'a CreateIndex),
     DropIndex(&'a DropIndex),
+    CreateSchema,
+    DropSchema,
     SetParam(&'a SetParam),
     SetTransaction(&'a SetTransaction),
 }
@@ -149,16 +156,16 @@ impl Ddl<'_> {
             | Ddl::AlterSystem(AlterSystem { ref timeout, .. })
             | Ddl::CreateProc(CreateProc { ref timeout, .. })
             | Ddl::DropProc(DropProc { ref timeout, .. })
-            | Ddl::RenameRoutine(RenameRoutine { ref timeout, .. }) => timeout,
+            | Ddl::RenameRoutine(RenameRoutine { ref timeout, .. }) => {
+                timeout.to_smolstr().parse().map_err(|e| {
+                    SbroadError::Invalid(
+                        Entity::SpaceMetadata,
+                        Some(format_smolstr!("timeout parsing error {e:?}")),
+                    )
+                })
+            }
+            Ddl::CreateSchema | Ddl::DropSchema => Ok(0.0),
         }
-        .to_smolstr()
-        .parse()
-        .map_err(|e| {
-            SbroadError::Invalid(
-                Entity::SpaceMetadata,
-                Some(format_smolstr!("timeout parsing error {e:?}")),
-            )
-        })
     }
 
     pub fn wait_applied_globally(&self) -> bool {
@@ -202,6 +209,8 @@ impl Ddl<'_> {
             Ddl::CreateProc(create_proc) => DdlOwned::CreateProc((*create_proc).clone()),
             Ddl::CreateTable(create_table) => DdlOwned::CreateTable((*create_table).clone()),
             Ddl::DropIndex(drop_index) => DdlOwned::DropIndex((*drop_index).clone()),
+            Ddl::CreateSchema => DdlOwned::CreateSchema,
+            Ddl::DropSchema => DdlOwned::DropSchema,
             Ddl::DropProc(drop_proc) => DdlOwned::DropProc((*drop_proc).clone()),
             Ddl::DropTable(drop_table) => DdlOwned::DropTable((*drop_table).clone()),
             Ddl::AlterSystem(alter_system) => DdlOwned::AlterSystem((*alter_system).clone()),
diff --git a/src/pgproto/backend/describe.rs b/src/pgproto/backend/describe.rs
index 655c7e3b30..ebb4e2f306 100644
--- a/src/pgproto/backend/describe.rs
+++ b/src/pgproto/backend/describe.rs
@@ -54,6 +54,7 @@ pub enum CommandTag {
     CreateTable = 2,
     CreateIndex = 18,
     CreatePlugin = 31,
+    CreateSchema = 50,
     ChangeConfig = 39,
     Commit = 53,
     DropProcedure = 15,
@@ -65,6 +66,7 @@ pub enum CommandTag {
     DisablePlugin = 33,
     DropIndex = 19,
     DropPlugin = 34,
+    DropSchema = 51,
     EnablePlugin = 32,
     EmptyQuery = 55,
     Explain = 6,
@@ -92,11 +94,13 @@ impl CommandTag {
             Self::AlterRole => "ALTER ROLE",
             Self::AlterSystem => "ALTER SYSTEM",
             Self::CreateRole => "CREATE ROLE",
+            Self::CreateSchema => "CREATE SCHEMA",
             Self::CreateTable => "CREATE TABLE",
             Self::CreateIndex => "CREATE INDEX",
             Self::Deallocate => "DEALLOCATE",
             Self::DeallocateAll => "DEALLOCATE ALL",
             Self::DropRole => "DROP ROLE",
+            Self::DropSchema => "DROP SCHEMA",
             Self::DropTable => "DROP TABLE",
             Self::DropIndex => "DROP INDEX",
             Self::Delete => "DELETE",
@@ -152,8 +156,10 @@ impl From<CommandTag> for QueryType {
             | CommandTag::CreateTable
             | CommandTag::CreateProcedure
             | CommandTag::CreateIndex
+            | CommandTag::CreateSchema
             | CommandTag::RenameRoutine
             | CommandTag::DropIndex
+            | CommandTag::DropSchema
             | CommandTag::SetParam
             | CommandTag::SetTransaction
             | CommandTag::CreatePlugin
@@ -205,6 +211,8 @@ impl TryFrom<&Node<'_>> for CommandTag {
                 Ddl::CreateTable { .. } => Ok(CommandTag::CreateTable),
                 Ddl::CreateProc { .. } => Ok(CommandTag::CreateProcedure),
                 Ddl::CreateIndex { .. } => Ok(CommandTag::CreateIndex),
+                Ddl::CreateSchema => Ok(CommandTag::CreateSchema),
+                Ddl::DropSchema => Ok(CommandTag::DropSchema),
                 Ddl::DropProc { .. } => Ok(CommandTag::DropProcedure),
                 Ddl::DropIndex { .. } => Ok(CommandTag::DropIndex),
                 Ddl::RenameRoutine { .. } => Ok(CommandTag::RenameRoutine),
diff --git a/src/sql.rs b/src/sql.rs
index e3a82f5b1c..ac995f84ba 100644
--- a/src/sql.rs
+++ b/src/sql.rs
@@ -34,7 +34,7 @@ use sbroad::ir::acl::{AlterOption, GrantRevokeType, Privilege as SqlPrivilege};
 use sbroad::ir::ddl::{AlterSystemType, ParamDef};
 use sbroad::ir::node::acl::AclOwned;
 use sbroad::ir::node::block::Block;
-use sbroad::ir::node::ddl::DdlOwned;
+use sbroad::ir::node::ddl::{Ddl, DdlOwned};
 use sbroad::ir::node::expression::ExprOwned;
 use sbroad::ir::node::relational::Relational;
 use sbroad::ir::node::NodeId;
@@ -51,6 +51,7 @@ use sbroad::ir::node::plugin::{
     AppendServiceToTier, ChangeConfig, CreatePlugin, DisablePlugin, DropPlugin, EnablePlugin,
     MigrateTo, Plugin, RemoveServiceFromTier, SettingsPair,
 };
+use sbroad::ir::node::Node;
 use sbroad::ir::operator::ConflictStrategy;
 use sbroad::ir::relation::Type;
 use sbroad::ir::tree::traversal::{LevelNode, PostOrderWithFilter, REL_CAPACITY};
@@ -237,6 +238,25 @@ pub fn dispatch(mut query: Query<RouterRuntime>) -> traft::Result<Tuple> {
     if query.is_ddl()? || query.is_acl()? {
         let ir_plan = query.get_exec_plan().get_ir_plan();
         let top_id = ir_plan.get_top()?;
+
+        if let Node::Ddl(_) = ir_plan.get_node(top_id)? {
+            let ddl_node = ir_plan.get_ddl_node(top_id)?;
+            if let Ddl::CreateSchema = ddl_node {
+                tlog!(
+                    Warning,
+                    "DDL for schemas is currently unsupported. Empty query response provided for CREATE SCHEMA."
+                );
+                return empty_query_response();
+            }
+            if let Ddl::DropSchema = ddl_node {
+                tlog!(
+                    Warning,
+                    "DDL for schemas is currently unsupported. Empty query response provided for DROP SCHEMA."
+                );
+                return empty_query_response();
+            }
+        }
+
         let ir_plan_mut = query.get_mut_exec_plan().get_mut_ir_plan();
 
         let ir_node = ir_plan_mut.replace_with_stub(top_id);
@@ -1501,6 +1521,11 @@ fn ddl_ir_node_to_op_or_result(
             );
             Ok(Break(ConsumerResult { row_count: 0 }))
         }
+        DdlOwned::CreateSchema | DdlOwned::DropSchema => {
+            return Err(Error::Other(
+                "unreachable CreateSchema/DropSchema".to_string().into(),
+            ));
+        }
     }
 }
 
diff --git a/test/pgproto/extended_query_test.py b/test/pgproto/extended_query_test.py
index 128a67faf2..7f4060cba8 100644
--- a/test/pgproto/extended_query_test.py
+++ b/test/pgproto/extended_query_test.py
@@ -427,3 +427,39 @@ def test_tcl(postgres: Postgres):
     assert rows == [(1, "Alice"), (2, "Bob")]
 
     cur = conn.execute("DROP TABLE test_table;")
+
+
+def test_create_schema(postgres: Postgres):
+    user = "admin"
+    password = "P@ssw0rd"
+    host = postgres.host
+    port = postgres.port
+    postgres.instance.sql(f"ALTER USER \"{user}\" WITH PASSWORD '{password}'")
+
+    conn = psycopg.connect(
+        f"user={user} password={password} host={host} port={port} sslmode=disable"
+    )
+    conn.autocommit = True
+
+    cur = conn.execute("CREATE SCHEMA test_schema;", prepare=True)
+    assert cur.pgresult is not None
+    assert cur.pgresult.status == ExecStatus.COMMAND_OK
+    assert cur.statusmessage == "CREATE SCHEMA"
+
+
+def test_drop_schema(postgres: Postgres):
+    user = "admin"
+    password = "P@ssw0rd"
+    host = postgres.host
+    port = postgres.port
+    postgres.instance.sql(f"ALTER USER \"{user}\" WITH PASSWORD '{password}'")
+
+    conn = psycopg.connect(
+        f"user={user} password={password} host={host} port={port} sslmode=disable"
+    )
+    conn.autocommit = True
+
+    cur = conn.execute("DROP SCHEMA test_schema;", prepare=True)
+    assert cur.pgresult is not None
+    assert cur.pgresult.status == ExecStatus.COMMAND_OK
+    assert cur.statusmessage == "DROP SCHEMA"
diff --git a/test/pgproto/simple_query_test.py b/test/pgproto/simple_query_test.py
index 67300f2d9f..f2b4835320 100644
--- a/test/pgproto/simple_query_test.py
+++ b/test/pgproto/simple_query_test.py
@@ -360,3 +360,39 @@ def test_tcl(postgres: Postgres):
     assert sorted(rows, key=lambda x: x[0]) == [(1, "Alice"), (2, "Bob")]
 
     cur = conn.execute("DROP TABLE test_table;")
+
+
+def test_create_schema(postgres: Postgres):
+    user = "admin"
+    password = "P@ssw0rd"
+    host = postgres.host
+    port = postgres.port
+    postgres.instance.sql(f"ALTER USER \"{user}\" WITH PASSWORD '{password}'")
+
+    conn = psycopg.connect(
+        f"user={user} password={password} host={host} port={port} sslmode=disable"
+    )
+    conn.autocommit = True
+
+    cur = conn.execute("CREATE SCHEMA test_schema;", prepare=False)
+    assert cur.pgresult is not None
+    assert cur.pgresult.status == ExecStatus.COMMAND_OK
+    assert cur.statusmessage == "CREATE SCHEMA"
+
+
+def test_drop_schema(postgres: Postgres):
+    user = "admin"
+    password = "P@ssw0rd"
+    host = postgres.host
+    port = postgres.port
+    postgres.instance.sql(f"ALTER USER \"{user}\" WITH PASSWORD '{password}'")
+
+    conn = psycopg.connect(
+        f"user={user} password={password} host={host} port={port} sslmode=disable"
+    )
+    conn.autocommit = True
+
+    cur = conn.execute("DROP SCHEMA test_schema;", prepare=False)
+    assert cur.pgresult is not None
+    assert cur.pgresult.status == ExecStatus.COMMAND_OK
+    assert cur.statusmessage == "DROP SCHEMA"
-- 
GitLab