From 02835dcf6b228b7140432a44ebaea1a6d4f6864f Mon Sep 17 00:00:00 2001
From: Denis Smirnov <sd@picodata.io>
Date: Wed, 31 Jan 2024 13:29:36 +0700
Subject: [PATCH] feat: implement sql procedures (table and creation)

Current commit introduces
- _pico_routine replicated table
- procedure creation with SQL API
---
 CHANGELOG.md            |   4 +
 Cargo.lock              |  16 +--
 sbroad                  |   2 +-
 src/access_control.rs   |  38 +++++-
 src/cas.rs              |   2 +
 src/rpc/ddl_apply.rs    |  10 +-
 src/schema.rs           | 259 ++++++++++++++++++++++++++++++++++++++++
 src/sql.rs              |  56 ++++++++-
 src/sql/pgproto.rs      |   6 +-
 src/storage.rs          | 133 ++++++++++++++++++++-
 src/traft/node.rs       |  61 ++++++++++
 src/traft/op.rs         |  21 +++-
 tarantool               |   2 +-
 tarantool-sys           |   2 +-
 test/int/test_basics.py |   2 +
 test/int/test_sql.py    |  87 ++++++++++++++
 16 files changed, 683 insertions(+), 18 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5e2f7702d4..5c888454ba 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -20,6 +20,10 @@ with the `YY.0M.MICRO` scheme.
 - New option `--tier` for `picodata run` allows to specify whether an
   instance belongs to a tier.
 
+- Introduce a new _pico_routine system table for the SQL procedures.
+
+- Clusterwide SQL supports procedure creation.
+
 ### Compatibility
 
 - System table `_pico_replicaset` now has a different format: the field `master_id`
diff --git a/Cargo.lock b/Cargo.lock
index e12c260ba6..99ec66d217 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1478,9 +1478,9 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
 
 [[package]]
 name = "pest"
-version = "2.7.5"
+version = "2.7.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ae9cee2a55a544be8b89dc6848072af97a20f2422603c10865be2a42b580fff5"
+checksum = "1f200d8d83c44a45b21764d1916299752ca035d15ecd46faca3e9a2a2bf6ad06"
 dependencies = [
  "memchr",
  "thiserror",
@@ -1489,9 +1489,9 @@ dependencies = [
 
 [[package]]
 name = "pest_derive"
-version = "2.7.5"
+version = "2.7.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "81d78524685f5ef2a3b3bd1cafbc9fcabb036253d9b1463e726a91cd16e2dfc2"
+checksum = "bcd6ab1236bbdb3a49027e920e693192ebfe8913f6d60e294de57463a493cfde"
 dependencies = [
  "pest",
  "pest_generator",
@@ -1499,9 +1499,9 @@ dependencies = [
 
 [[package]]
 name = "pest_generator"
-version = "2.7.5"
+version = "2.7.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "68bd1206e71118b5356dae5ddc61c8b11e28b09ef6a31acbd15ea48a28e0c227"
+checksum = "2a31940305ffc96863a735bef7c7994a00b325a7138fdbc5bda0f1a0476d3275"
 dependencies = [
  "pest",
  "pest_meta",
@@ -1512,9 +1512,9 @@ dependencies = [
 
 [[package]]
 name = "pest_meta"
-version = "2.7.5"
+version = "2.7.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7c747191d4ad9e4a4ab9c8798f1e82a39affe7ef9648390b7e5548d18e099de6"
+checksum = "a7ff62f5259e53b78d1af898941cdcdccfae7385cf7d793a6e55de5d05bb4b7d"
 dependencies = [
  "once_cell",
  "pest",
diff --git a/sbroad b/sbroad
index aa4111ad6a..b4bce6959d 160000
--- a/sbroad
+++ b/sbroad
@@ -1 +1 @@
-Subproject commit aa4111ad6ae8f2c4b25e236f63c371eae89ed826
+Subproject commit b4bce6959deb68ab47b3fd5871e9851febb47288
diff --git a/src/access_control.rs b/src/access_control.rs
index b905c4d042..4185feac16 100644
--- a/src/access_control.rs
+++ b/src/access_control.rs
@@ -43,7 +43,7 @@ use tarantool::{
 
 use crate::{
     schema::{PrivilegeDef, PrivilegeType, SchemaObjectType as PicoSchemaObjectType, ADMIN_ID},
-    storage::{space_by_id, Clusterwide, ToEntryIter},
+    storage::{make_routine_not_found, space_by_id, Clusterwide, ToEntryIter},
     traft::op::{self, Op},
 };
 
@@ -206,6 +206,16 @@ fn access_check_ddl(ddl: &op::Ddl, as_user: UserId) -> tarantool::Result<()> {
                 as_user,
             )
         }
+        op::Ddl::CreateProcedure {
+            id, name, owner, ..
+        } => box_access_check_ddl_as_user(
+            name,
+            *id,
+            *owner,
+            TntSchemaObjectType::Function,
+            PrivType::Create,
+            as_user,
+        ),
     }
 }
 
@@ -396,6 +406,32 @@ fn access_check_grant_revoke(
                 grantor_id,
             );
         }
+
+        PicoSchemaObjectType::Routine => {
+            let routine = storage
+                .routines
+                .by_id(object_id)?
+                .ok_or_else(|| make_routine_not_found(object_id))?;
+
+            // Only owner or admin can grant on routine.
+            if routine.owner != grantor_id && grantor_id != ADMIN_ID {
+                return Err(make_access_denied(
+                    access_name,
+                    PicoSchemaObjectType::Routine,
+                    &routine.name,
+                    grantor.name,
+                ));
+            }
+
+            return box_access_check_ddl_as_user(
+                &routine.name,
+                routine.id,
+                grantor_id,
+                TntSchemaObjectType::Function,
+                access,
+                grantor_id,
+            );
+        }
     }
 
     Ok(())
diff --git a/src/cas.rs b/src/cas.rs
index 523849b2c4..bc8e6cc469 100644
--- a/src/cas.rs
+++ b/src/cas.rs
@@ -37,6 +37,7 @@ const PROHIBITED_SPACES: &[ClusterwideTable] = &[
     ClusterwideTable::User,
     ClusterwideTable::Role,
     ClusterwideTable::Privilege,
+    ClusterwideTable::Routine,
 ];
 
 // FIXME: cas::Error will be returned as a string when rpc is called
@@ -732,6 +733,7 @@ fn modifies_operable(op: &Op, space: SpaceId, storage: &Clusterwide) -> bool {
         Ddl::DropTable { id, .. } => *id == space,
         Ddl::CreateIndex { .. } => false,
         Ddl::DropIndex { .. } => false,
+        Ddl::CreateProcedure { .. } => false,
     };
     match op {
         Op::DdlPrepare { ddl, .. } => ddl_modifies(ddl),
diff --git a/src/rpc/ddl_apply.rs b/src/rpc/ddl_apply.rs
index a441c5db25..72528a07fd 100644
--- a/src/rpc/ddl_apply.rs
+++ b/src/rpc/ddl_apply.rs
@@ -1,6 +1,8 @@
 use crate::op::Ddl;
 use crate::storage::Clusterwide;
-use crate::storage::{ddl_create_space_on_master, ddl_drop_space_on_master};
+use crate::storage::{
+    ddl_create_function_on_master, ddl_create_space_on_master, ddl_drop_space_on_master,
+};
 use crate::storage::{local_schema_version, set_local_schema_version};
 use crate::tlog;
 use crate::traft::error::Error as TraftError;
@@ -135,6 +137,12 @@ pub fn apply_schema_change(
             }
         }
 
+        Ddl::CreateProcedure { id, ref name, .. } => {
+            if let Err(e) = ddl_create_function_on_master(id, name) {
+                return Err(Error::Aborted(e.to_string()));
+            }
+        }
+
         _ => {
             todo!();
         }
diff --git a/src/schema.rs b/src/schema.rs
index 5242e07176..d9a894a4f9 100644
--- a/src/schema.rs
+++ b/src/schema.rs
@@ -1,4 +1,5 @@
 use once_cell::sync::OnceCell;
+use sbroad::ir::ddl::Language;
 use std::borrow::Cow;
 use std::collections::{BTreeMap, HashSet};
 use std::fmt::Display;
@@ -24,6 +25,8 @@ use tarantool::{
     util::Value,
 };
 
+use sbroad::ir::value::Value as IrValue;
+
 use serde::{Deserialize, Serialize};
 
 use crate::cas::{self, compare_and_swap};
@@ -410,6 +413,7 @@ tarantool::define_str_enum! {
     pub enum SchemaObjectType {
         Table = "table",
         Role = "role",
+        Routine = "routine",
         User = "user",
         Universe = "universe",
     }
@@ -545,6 +549,10 @@ impl PrivilegeDef {
                 None => &[Create, Alter, Drop],
             },
             SchemaObjectType::Universe => &[Login],
+            SchemaObjectType::Routine => match privilege_def.object_id() {
+                Some(_) => &[Execute, Drop],
+                None => &[Create, Drop],
+            },
         };
 
         if !valid_privileges.contains(&privilege) {
@@ -688,6 +696,7 @@ impl PrivilegeDef {
                 debug_assert_eq!(self.object_id, 0);
                 return Ok(None);
             }
+            SchemaObjectType::Routine => storage.routines.by_id(id).map(|t| t.map(|t| t.name)),
         }
         .expect("storage should not fail")
         .ok_or_else(|| Error::other(format!("object with id {id} should exist")))?;
@@ -802,6 +811,175 @@ pub fn init_user_pico_service() {
     }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// RoutineDef
+////////////////////////////////////////////////////////////////////////////////
+
+/// Routine kind.
+#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
+#[serde(rename_all = "lowercase")]
+pub enum RoutineKind {
+    #[default]
+    Procedure,
+}
+
+impl Display for RoutineKind {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            RoutineKind::Procedure => write!(f, "procedure"),
+        }
+    }
+}
+
+/// Parameter mode.
+#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
+#[serde(rename_all = "lowercase")]
+pub enum RoutineParamMode {
+    #[default]
+    In,
+}
+
+impl Display for RoutineParamMode {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            RoutineParamMode::In => write!(f, "in"),
+        }
+    }
+}
+
+/// Routine parameter definition.
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
+pub struct RoutineParamDef {
+    #[serde(default)]
+    pub mode: RoutineParamMode,
+    pub r#type: FieldType,
+    #[serde(default)]
+    pub default: Option<IrValue>,
+}
+
+impl Display for RoutineParamDef {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "mode: {}, type: {}", self.mode, self.r#type)?;
+        if let Some(default) = &self.default {
+            write!(f, ", default: {}", default)?;
+        }
+        Ok(())
+    }
+}
+
+impl Default for RoutineParamDef {
+    fn default() -> Self {
+        Self {
+            mode: RoutineParamMode::default(),
+            r#type: FieldType::Scalar,
+            default: None,
+        }
+    }
+}
+
+impl RoutineParamDef {
+    pub fn with_type(self, r#type: FieldType) -> Self {
+        Self { r#type, ..self }
+    }
+}
+
+pub type RoutineParams = Vec<RoutineParamDef>;
+
+pub type RoutineReturns = Vec<IrValue>;
+
+#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
+#[serde(rename_all = "lowercase")]
+pub enum RoutineLanguage {
+    #[default]
+    SQL,
+}
+
+impl From<Language> for RoutineLanguage {
+    fn from(language: Language) -> Self {
+        match language {
+            Language::SQL => Self::SQL,
+        }
+    }
+}
+
+#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
+#[serde(rename_all = "lowercase")]
+pub enum RoutineSecurity {
+    #[default]
+    Invoker,
+}
+
+/// Routine definition.
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
+pub struct RoutineDef {
+    pub id: u32,
+    pub name: String,
+    pub kind: RoutineKind,
+    pub params: RoutineParams,
+    pub returns: RoutineReturns,
+    pub language: RoutineLanguage,
+    pub body: String,
+    pub security: RoutineSecurity,
+    pub operable: bool,
+    pub schema_version: u64,
+    pub owner: UserId,
+}
+
+impl Encode for RoutineDef {}
+
+impl RoutineDef {
+    /// Index (0-based) of field "operable" in _pico_routine table format.
+    pub const FIELD_OPERABLE: usize = 8;
+
+    /// Format of the _pico_routine global table.
+    #[inline(always)]
+    pub fn format() -> Vec<tarantool::space::Field> {
+        use tarantool::space::Field;
+        vec![
+            Field::from(("id", FieldType::Unsigned)),
+            Field::from(("name", FieldType::String)),
+            Field::from(("kind", FieldType::String)),
+            Field::from(("params", FieldType::Array)),
+            Field::from(("returns", FieldType::Array)),
+            Field::from(("language", FieldType::String)),
+            Field::from(("body", FieldType::String)),
+            Field::from(("security", FieldType::String)),
+            Field::from(("operable", FieldType::Boolean)),
+            Field::from(("schema_version", FieldType::Unsigned)),
+            Field::from(("owner", FieldType::Unsigned)),
+        ]
+    }
+
+    /// A dummy instance of the type for use in tests.
+    #[inline(always)]
+    pub fn for_tests() -> Self {
+        Self {
+            id: 16005,
+            name: "proc".into(),
+            kind: RoutineKind::Procedure,
+            params: vec![
+                RoutineParamDef {
+                    mode: RoutineParamMode::In,
+                    r#type: FieldType::String,
+                    default: Some(IrValue::String("hello".into())),
+                },
+                RoutineParamDef {
+                    mode: RoutineParamMode::In,
+                    r#type: FieldType::Unsigned,
+                    default: None,
+                },
+            ],
+            returns: vec![],
+            language: RoutineLanguage::SQL,
+            body: "values (?), (?)".into(),
+            security: RoutineSecurity::Invoker,
+            operable: true,
+            schema_version: 421,
+            owner: 42,
+        }
+    }
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // ...
 ////////////////////////////////////////////////////////////////////////////////
@@ -840,6 +1018,8 @@ pub enum DdlError {
     Aborted,
     #[error("there is no pending ddl operation")]
     NoPendingDdl,
+    #[error("{0}")]
+    CreateRoutine(#[from] CreateRoutineError),
 }
 
 #[derive(Debug, thiserror::Error)]
@@ -868,6 +1048,28 @@ impl From<CreateTableError> for Error {
     }
 }
 
+#[derive(Debug, thiserror::Error)]
+pub enum CreateRoutineError {
+    #[error("routine {name} already exists with a different kind")]
+    ExistsWithDifferentKind { name: String },
+    #[error("routine {name} already exists with different parameters")]
+    ExistsWithDifferentParams { name: String },
+    #[error("routine {name} already exists with a different language")]
+    ExistsWithDifferentLanguage { name: String },
+    #[error("routine {name} already exists with a different body")]
+    ExistsWithDifferentBody { name: String },
+    #[error("routine {name} already exists with a different security")]
+    ExistsWithDifferentSecurity { name: String },
+    #[error("routine {name} already exists with a different owner")]
+    ExistsWithDifferentOwner { name: String },
+}
+
+impl From<CreateRoutineError> for Error {
+    fn from(err: CreateRoutineError) -> Self {
+        DdlError::CreateRoutine(err).into()
+    }
+}
+
 // TODO: Add `LuaRead` to tarantool::space::Field and use it
 #[derive(Clone, Debug, LuaRead)]
 pub struct Field {
@@ -895,6 +1097,55 @@ impl From<Field> for tarantool::space::Field {
     }
 }
 
+#[derive(Clone, Debug)]
+pub struct CreateProcParams {
+    pub name: String,
+    pub params: RoutineParams,
+    pub language: RoutineLanguage,
+    pub body: String,
+    pub security: RoutineSecurity,
+    pub owner: UserId,
+}
+
+impl CreateProcParams {
+    pub fn func_exists(&self) -> bool {
+        let func_space = Space::from(SystemSpace::Func);
+
+        let name_idx = func_space
+            .index_cached("name")
+            .expect("_function should have an index by name");
+        let t = name_idx
+            .get(&[&self.name])
+            .expect("reading from _function shouldn't fail");
+        t.is_some()
+    }
+
+    pub fn validate(&self, storage: &Clusterwide) -> traft::Result<()> {
+        let routine = storage.routines.by_name(&self.name)?;
+        if let Some(def) = routine {
+            if def.kind != RoutineKind::Procedure {
+                return Err(CreateRoutineError::ExistsWithDifferentKind { name: def.name })?;
+            }
+            if def.params != self.params {
+                return Err(CreateRoutineError::ExistsWithDifferentParams { name: def.name })?;
+            }
+            if def.language != self.language {
+                return Err(CreateRoutineError::ExistsWithDifferentLanguage { name: def.name })?;
+            }
+            if def.body != self.body {
+                return Err(CreateRoutineError::ExistsWithDifferentBody { name: def.name })?;
+            }
+            if def.security != self.security {
+                return Err(CreateRoutineError::ExistsWithDifferentSecurity { name: def.name })?;
+            }
+            if def.owner != self.owner {
+                return Err(CreateRoutineError::ExistsWithDifferentOwner { name: def.name })?;
+            }
+        }
+        Ok(())
+    }
+}
+
 #[derive(Clone, Debug, LuaRead)]
 pub struct CreateTableParams {
     pub(crate) id: Option<SpaceId>,
@@ -1626,4 +1877,12 @@ mod test {
         let valid = &[Login];
         check_object_privilege(SchemaObjectType::Universe, valid, 0);
     }
+
+    #[test]
+    fn routine_def_matches_format() {
+        let i = RoutineDef::for_tests();
+        let tuple_data = i.to_tuple_buffer().unwrap();
+        let format = RoutineDef::format();
+        crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "RoutineDef::format");
+    }
 }
diff --git a/src/sql.rs b/src/sql.rs
index 09043833e4..fa00f81ee8 100644
--- a/src/sql.rs
+++ b/src/sql.rs
@@ -1,8 +1,9 @@
 //! Clusterwide SQL query execution.
 
 use crate::schema::{
-    wait_for_ddl_commit, CreateTableParams, DistributionParam, Field, PrivilegeDef, PrivilegeType,
-    RoleDef, SchemaObjectType, ShardingFn, UserDef, ADMIN_ID,
+    wait_for_ddl_commit, CreateProcParams, CreateTableParams, DistributionParam, Field,
+    PrivilegeDef, PrivilegeType, RoleDef, RoutineLanguage, RoutineParamDef, RoutineParams,
+    RoutineSecurity, SchemaObjectType, ShardingFn, UserDef, ADMIN_ID,
 };
 use crate::sql::pgproto::{
     with_portals, BoxedPortal, Describe, Descriptor, UserDescriptors, PG_PORTALS,
@@ -36,6 +37,7 @@ use sbroad::ir::value::{LuaValue, Value};
 use sbroad::ir::{Node as IrNode, Plan as IrPlan};
 use sbroad::otm::{query_id, query_span, stat_query_span, OTM_CHAR_LIMIT};
 use serde::Deserialize;
+use tarantool::schema::function::func_next_reserved_id;
 
 use crate::storage::Clusterwide;
 use ::tarantool::access_control::{box_access_check_space, PrivType};
@@ -623,6 +625,35 @@ fn reenterable_schema_change_request(
 
     // Check parameters
     let params = match ir_node {
+        IrNode::Ddl(Ddl::CreateProc {
+            name,
+            params: args,
+            body,
+            language,
+            ..
+        }) => {
+            let args: RoutineParams = args
+                .into_iter()
+                .map(|p| {
+                    let field_type = FieldType::from(&p.data_type);
+                    RoutineParamDef::default().with_type(field_type)
+                })
+                .collect();
+            let language = RoutineLanguage::from(language);
+            let security = RoutineSecurity::default();
+
+            let params = CreateProcParams {
+                name,
+                params: args,
+                language,
+                body,
+                security,
+                owner: current_user,
+            };
+            params.validate(storage)?;
+            Params::CreateProcedure(params)
+        }
+
         IrNode::Ddl(Ddl::CreateTable {
             name,
             format,
@@ -749,6 +780,26 @@ fn reenterable_schema_change_request(
 
         // Check for conflicts and make the op
         let op = match &params {
+            Params::CreateProcedure(params) => {
+                if params.func_exists() {
+                    // Function already exists, no op needed.
+                    return Ok(ConsumerResult { row_count: 0 });
+                }
+                let id = func_next_reserved_id()?;
+                let ddl = OpDdl::CreateProcedure {
+                    id,
+                    name: params.name.clone(),
+                    params: params.params.clone(),
+                    language: params.language.clone(),
+                    body: params.body.clone(),
+                    security: params.security.clone(),
+                    owner: params.owner,
+                };
+                Op::DdlPrepare {
+                    schema_version,
+                    ddl,
+                }
+            }
             Params::CreateTable(params) => {
                 if params.space_exists()? {
                     // Space already exists, no op needed
@@ -1021,6 +1072,7 @@ fn reenterable_schema_change_request(
         DropRole(String),
         GrantPrivilege(GrantRevokeType, String),
         RevokePrivilege(GrantRevokeType, String),
+        CreateProcedure(CreateProcParams),
     }
 }
 
diff --git a/src/sql/pgproto.rs b/src/sql/pgproto.rs
index 32d88e81c7..0718e7ff1e 100644
--- a/src/sql/pgproto.rs
+++ b/src/sql/pgproto.rs
@@ -242,6 +242,7 @@ pub enum QueryType {
 #[repr(u8)]
 pub enum CommandTag {
     AlterRole = 0,
+    CreateProcedure = 14,
     CreateRole = 1,
     CreateTable = 2,
     DropRole = 3,
@@ -268,7 +269,9 @@ impl From<CommandTag> for QueryType {
             | CommandTag::GrantRole
             | CommandTag::Revoke
             | CommandTag::RevokeRole => QueryType::Acl,
-            CommandTag::DropTable | CommandTag::CreateTable => QueryType::Ddl,
+            CommandTag::DropTable | CommandTag::CreateTable | CommandTag::CreateProcedure => {
+                QueryType::Ddl
+            }
             CommandTag::Delete | CommandTag::Insert | CommandTag::Update => QueryType::Dml,
             CommandTag::Explain => QueryType::Explain,
             CommandTag::Select => QueryType::Dql,
@@ -297,6 +300,7 @@ impl TryFrom<&Node> for CommandTag {
             Node::Ddl(ddl) => match ddl {
                 Ddl::DropTable { .. } => Ok(CommandTag::DropTable),
                 Ddl::CreateTable { .. } => Ok(CommandTag::CreateTable),
+                Ddl::CreateProc { .. } => Ok(CommandTag::CreateProcedure),
             },
             Node::Relational(rel) => match rel {
                 Relational::Delete { .. } => Ok(CommandTag::Delete),
diff --git a/src/storage.rs b/src/storage.rs
index d7afff7974..ed9fe5b98e 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -20,7 +20,7 @@ use crate::replicaset::Replicaset;
 use crate::schema::INITIAL_SCHEMA_VERSION;
 use crate::schema::{Distribution, PrivilegeType, SchemaObjectType};
 use crate::schema::{IndexDef, TableDef};
-use crate::schema::{PrivilegeDef, RoleDef, UserDef};
+use crate::schema::{PrivilegeDef, RoleDef, RoutineDef, UserDef};
 use crate::schema::{ADMIN_ID, PUBLIC_ID, UNIVERSE_ID};
 use crate::sql::pgproto::DEFAULT_MAX_PG_PORTALS;
 use crate::tier::Tier;
@@ -349,6 +349,17 @@ define_clusterwide_tables! {
                 index_name: Index => "name",
             }
         }
+        Routine = 524, "_pico_routine" => {
+            Clusterwide::routines;
+
+            /// A struct for accessing info of all the user-defined routines.
+            pub struct Routines {
+                space: Space,
+                #[primary]
+                index_id: Index => "id",
+                index_name: Index => "name",
+            }
+        }
     }
 }
 
@@ -2098,6 +2109,7 @@ pub fn ddl_abort_on_master(ddl: &Ddl, version: u64) -> traft::Result<()> {
     debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
     let sys_space = Space::from(SystemSpace::Space);
     let sys_index = Space::from(SystemSpace::Index);
+    let sys_func = Space::from(SystemSpace::Func);
 
     match *ddl {
         Ddl::CreateTable { id, .. } => {
@@ -2111,6 +2123,11 @@ pub fn ddl_abort_on_master(ddl: &Ddl, version: u64) -> traft::Result<()> {
             // Actual drop happens only on commit, so there's nothing to abort.
         }
 
+        Ddl::CreateProcedure { id, .. } => {
+            sys_func.delete(&[id])?;
+            set_local_schema_version(version)?;
+        }
+
         _ => {
             todo!();
         }
@@ -2119,6 +2136,32 @@ pub fn ddl_abort_on_master(ddl: &Ddl, version: u64) -> traft::Result<()> {
     Ok(())
 }
 
+/// Create tarantool function which throws an error if it's called.
+/// Tarantool function is created with `if_not_exists = true`, so it's
+/// safe to call this rust function multiple times.
+pub fn ddl_create_function_on_master(func_id: u32, func_name: &str) -> traft::Result<()> {
+    debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
+    let lua = ::tarantool::lua_state();
+    lua.exec_with(
+        r#"
+        local func_id, func_name = ...
+        local def = {
+            language = 'LUA',
+            body = string.format(
+                [[function() error("function %s is used internally by picodata") end]],
+                func_name
+            ),
+            id = func_id,
+            if_not_exists = true,
+        }
+        box.schema.func.create(func_name, def)
+    "#,
+        (func_id, func_name),
+    )
+    .map_err(LuaError::from)?;
+    Ok(())
+}
+
 /// Create tarantool space and any required indexes. Currently it creates a
 /// primary index and a `bucket_id` index if it's a sharded space.
 ///
@@ -2645,6 +2688,94 @@ impl ToEntryIter for Tiers {
     }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// Routines
+////////////////////////////////////////////////////////////////////////////////
+impl Routines {
+    pub fn new() -> tarantool::Result<Self> {
+        let space = Space::builder(Self::TABLE_NAME)
+            .id(Self::TABLE_ID)
+            .space_type(SpaceType::DataLocal)
+            .format(Self::format())
+            .if_not_exists(true)
+            .create()?;
+
+        let index_id = space
+            .index_builder("id")
+            .unique(true)
+            .part("id")
+            .if_not_exists(true)
+            .create()?;
+
+        let index_name = space
+            .index_builder("name")
+            .unique(true)
+            .part("name")
+            .if_not_exists(true)
+            .create()?;
+
+        Ok(Self {
+            space,
+            index_id,
+            index_name,
+        })
+    }
+
+    #[inline(always)]
+    pub fn format() -> Vec<tarantool::space::Field> {
+        RoutineDef::format()
+    }
+
+    #[inline(always)]
+    pub fn by_name(&self, routine_name: &str) -> tarantool::Result<Option<RoutineDef>> {
+        let tuple = self.index_name.get(&[routine_name])?;
+        tuple.as_ref().map(Tuple::decode).transpose()
+    }
+
+    #[inline]
+    pub fn by_id(&self, routine_id: u32) -> tarantool::Result<Option<RoutineDef>> {
+        let tuple = self.space.get(&[routine_id])?;
+        tuple.as_ref().map(Tuple::decode).transpose()
+    }
+
+    #[inline]
+    pub fn put(&self, routine: &RoutineDef) -> tarantool::Result<()> {
+        self.space.replace(routine)?;
+        Ok(())
+    }
+
+    #[inline]
+    pub fn delete(&self, routine_id: u32) -> tarantool::Result<()> {
+        self.space.delete(&[routine_id])?;
+        Ok(())
+    }
+
+    #[inline]
+    pub fn update_operable(&self, routine_id: u32, operable: bool) -> tarantool::Result<()> {
+        let mut ops = UpdateOps::with_capacity(1);
+        ops.assign(RoutineDef::FIELD_OPERABLE, operable)?;
+        self.space.update(&[routine_id], ops)?;
+        Ok(())
+    }
+}
+
+impl ToEntryIter for Routines {
+    type Entry = RoutineDef;
+
+    #[inline(always)]
+    fn index_iter(&self) -> tarantool::Result<IndexIterator> {
+        self.space.select(IteratorType::All, &())
+    }
+}
+
+pub fn make_routine_not_found(routine_id: u32) -> tarantool::error::TarantoolError {
+    tarantool::set_error!(
+        tarantool::error::TarantoolErrorCode::TupleNotFound,
+        "routine with id {routine_id} not found",
+    );
+    tarantool::error::TarantoolError::last()
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // SchemaDef
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 3eff4790f5..8d6a26178e 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -14,6 +14,9 @@ use crate::loop_start;
 use crate::reachability::instance_reachability_manager;
 use crate::reachability::InstanceReachabilityManagerRef;
 use crate::rpc;
+use crate::schema::RoutineDef;
+use crate::schema::RoutineKind;
+use crate::schema::SchemaObjectType;
 use crate::schema::{Distribution, IndexDef, TableDef};
 use crate::sentinel;
 use crate::storage::acl;
@@ -987,6 +990,25 @@ impl NodeImpl {
                         );
                     }
 
+                    Ddl::CreateProcedure {
+                        id, name, owner, ..
+                    } => {
+                        self.storage
+                            .routines
+                            .update_operable(id, true)
+                            .expect("storage shouldn't fail");
+
+                        let initiator_def = user_by_id(owner).expect("user must exist");
+
+                        crate::audit!(
+                            message: "created procedure `{name}`",
+                            title: "create_procedure",
+                            severity: Medium,
+                            name: &name,
+                            initiator: initiator_def.name,
+                        );
+                    }
+
                     _ => {
                         todo!()
                     }
@@ -1036,6 +1058,17 @@ impl NodeImpl {
                             .expect("storage shouldn't fail");
                     }
 
+                    Ddl::CreateProcedure { id, .. } => {
+                        self.storage
+                            .privileges
+                            .delete_all_by_object(SchemaObjectType::Routine, id.into())
+                            .expect("storage shouldn't fail");
+                        self.storage
+                            .routines
+                            .delete(id)
+                            .expect("storage shouldn't fail");
+                    }
+
                     _ => {
                         todo!()
                     }
@@ -1321,6 +1354,34 @@ impl NodeImpl {
                 let _ = (index_id, space_id);
                 todo!();
             }
+
+            Ddl::CreateProcedure {
+                id,
+                name,
+                params,
+                language,
+                body,
+                security,
+                owner,
+            } => {
+                let proc_def = RoutineDef {
+                    id,
+                    name,
+                    kind: RoutineKind::Procedure,
+                    params,
+                    returns: vec![],
+                    language,
+                    body,
+                    operable: false,
+                    security,
+                    schema_version,
+                    owner,
+                };
+                self.storage
+                    .routines
+                    .put(&proc_def)
+                    .expect("storage shouldn't fail");
+            }
         }
 
         self.storage
diff --git a/src/traft/op.rs b/src/traft/op.rs
index 5dd9fe4866..0b3d5b428b 100644
--- a/src/traft/op.rs
+++ b/src/traft/op.rs
@@ -1,5 +1,6 @@
 use crate::schema::{
-    Distribution, PrivilegeDef, RoleDef, UserDef, ADMIN_ID, GUEST_ID, PUBLIC_ID, SUPER_ID,
+    Distribution, PrivilegeDef, RoleDef, RoutineLanguage, RoutineParams, RoutineSecurity, UserDef,
+    ADMIN_ID, GUEST_ID, PUBLIC_ID, SUPER_ID,
 };
 use crate::storage::space_by_name;
 use crate::storage::Clusterwide;
@@ -104,6 +105,15 @@ impl std::fmt::Display for Op {
                     "DdlPrepare({schema_version}, DropIndex({space_id}, {index_id}))"
                 )
             }
+            Self::DdlPrepare {
+                schema_version,
+                ddl: Ddl::CreateProcedure { id, name, .. },
+            } => {
+                write!(
+                    f,
+                    "DdlPrepare({schema_version}, CreateProcedure({id}, {name}))"
+                )
+            }
             Self::DdlCommit => write!(f, "DdlCommit"),
             Self::DdlAbort => write!(f, "DdlAbort"),
             Self::Acl(Acl::CreateUser { user_def }) => {
@@ -501,6 +511,15 @@ pub enum Ddl {
         space_id: SpaceId,
         index_id: IndexId,
     },
+    CreateProcedure {
+        id: UserId,
+        name: String,
+        params: RoutineParams,
+        language: RoutineLanguage,
+        body: String,
+        security: RoutineSecurity,
+        owner: UserId,
+    },
 }
 
 /// Builder for [`Op::DdlPrepare`] operations.
diff --git a/tarantool b/tarantool
index 5c001ab601..76268529ca 160000
--- a/tarantool
+++ b/tarantool
@@ -1 +1 @@
-Subproject commit 5c001ab601a9f629a649d32a6038c006e7802d3c
+Subproject commit 76268529ca8a202c25ff2a45e50b7c99cf68f866
diff --git a/tarantool-sys b/tarantool-sys
index cbd79b118a..43cdcf7f01 160000
--- a/tarantool-sys
+++ b/tarantool-sys
@@ -1 +1 @@
-Subproject commit cbd79b118a7af785849896d068712660f55e9799
+Subproject commit 43cdcf7f0156f2d84e92ceafe857b0448a48bde4
diff --git a/test/int/test_basics.py b/test/int/test_basics.py
index f05280e3f6..7f79b7c6c3 100644
--- a/test/int/test_basics.py
+++ b/test/int/test_basics.py
@@ -290,6 +290,7 @@ def test_raft_log(instance: Instance):
 |  0  | 1  |Insert({_pico_table}, [{_pico_privilege},"_pico_privilege",["global"],[["privilege","string",false],["object_type","string",false],["object_id","integer",false],["grantee_id","unsigned",false],["grantor_id","unsigned",false],["schema_version","unsigned",false]],0,true,"memtx",1])|
 |  0  | 1  |Insert({_pico_table}, [{_pico_role},"_pico_role",["global"],[["id","unsigned",false],["name","string",false],["schema_version","unsigned",false],["owner","unsigned",false]],0,true,"memtx",1])|
 |  0  | 1  |Insert({_pico_table}, [{_pico_tier},"_pico_tier",["global"],[["name","string",false],["replication_factor","unsigned",false]],0,true,"memtx",1])|
+|  0  | 1  |Insert({_pico_table}, [{_pico_routine},"_pico_routine",["global"],[["id","unsigned",false],["name","string",false],["kind","string",false],["params","array",false],["returns","array",false],["language","string",false],["body","string",false],["security","string",false],["operable","boolean",false],["schema_version","unsigned",false],["owner","unsigned",false]],0,true,"memtx",1])|
 |  0  | 1  |AddNode(1)|
 |  0  | 2  |-|
 |  0  | 2  |Replace({_pico_instance}, ["i1","68d4a766-4144-3248-aeb4-e212356716e4",1,"r1","e0df68c5-e7f9-395f-86b3-30ad9e1b7b07",["Offline",0],["Online",1],{b},"default"])|
@@ -307,6 +308,7 @@ def test_raft_log(instance: Instance):
         _pico_peer_address=space_id("_pico_peer_address"),
         _pico_property=space_id("_pico_property"),
         _pico_replicaset=space_id("_pico_replicaset"),
+        _pico_routine=space_id("_pico_routine"),
         _pico_instance=space_id("_pico_instance"),
         _pico_tier=space_id("_pico_tier"),
         _pico_privilege=space_id("_pico_privilege"),
diff --git a/test/int/test_sql.py b/test/int/test_sql.py
index b6b5a51228..f77a0ba5a1 100644
--- a/test/int/test_sql.py
+++ b/test/int/test_sql.py
@@ -2034,3 +2034,90 @@ def test_user_changes_password(cluster: Cluster):
     )
     # ensure we can authenticate with new password
     i1.sql("SELECT * FROM (VALUES (1))", user=user_name, password=new_password)
+
+
+def test_create_procedure(cluster: Cluster):
+    i1, i2 = cluster.deploy(instance_count=2)
+
+    data = i1.sql(
+        """
+        create table t (a int not null, b int, primary key (a))
+        using memtx
+        distributed by (b)
+        """
+    )
+    assert data["row_count"] == 1
+
+    # Check that the procedure would be created with the expected id.
+    next_func_id = i1.eval("return box.internal.generate_func_id(true)")
+    data = i2.sql(
+        """
+        create procedure proc1(int)
+        language SQL
+        as $$insert into t values(?, ?)$$
+        """
+    )
+    assert data["row_count"] == 1
+    data = i1.sql(
+        """
+        select "id" from "_pico_routine" where "name" = 'PROC1'
+        """,
+    )
+    assert data["rows"] == [[next_func_id]]
+
+    # Check that recreation of the same procedure is idempotent.
+    data = i2.sql(
+        """
+        create procedure proc1(int)
+        language SQL
+        as $$insert into t values(?, ?)$$
+        """
+    )
+    assert data["row_count"] == 0
+
+    # Check that we can't create a procedure with the same name but different
+    # signature.
+    with pytest.raises(
+        ReturnError, match="routine PROC1 already exists with different parameters"
+    ):
+        i2.sql(
+            """
+            create procedure proc1(int, text)
+            language SQL
+            as $$insert into t values(?, ?)$$
+            """
+        )
+    with pytest.raises(
+        ReturnError, match="routine PROC1 already exists with a different body"
+    ):
+        i2.sql(
+            """
+            create procedure proc1(int)
+            language SQL
+            as $$insert into t values(1, 2)$$
+            """
+        )
+
+    # Check the routine creation abortion on _func space id conflict.
+    i2.eval(
+        """
+        box.schema.func.create(
+            'sum1',
+            {
+                body = [[function(a, b) return a + b end]],
+                id = box.internal.generate_func_id(true)
+            }
+        )
+        """
+    )
+    with pytest.raises(ReturnError, match="ddl operation was aborted"):
+        i1.sql(
+            """
+            create procedure proc2()
+            as $$insert into t values(1, 2)$$
+            """
+        )
+    data = i1.sql(""" select * from "_pico_routine" where "name" = 'PROC2' """)
+    assert data["rows"] == []
+    data = i2.sql(""" select * from "_pico_routine" where "name" = 'PROC2' """)
+    assert data["rows"] == []
-- 
GitLab