Skip to content
Snippets Groups Projects
sql.rs 56.85 KiB
//! Clusterwide SQL query execution.

use crate::access_control::UserMetadataKind;
use crate::schema::{
    wait_for_ddl_commit, CreateProcParams, CreateTableParams, DistributionParam, Field,
    PrivilegeDef, PrivilegeType, RenameRoutineParams, RoutineDef, RoutineLanguage, RoutineParamDef,
    RoutineParams, RoutineSecurity, SchemaObjectType, ShardingFn, UserDef, ADMIN_ID,
};
use crate::sql::pgproto::{
    with_portals_mut, Portal, PortalDescribe, Statement, StatementDescribe, UserPortalNames,
    UserStatementNames, PG_PORTALS, PG_STATEMENTS,
};
use crate::sql::router::RouterRuntime;
use crate::sql::storage::StorageRuntime;
use crate::storage::space_by_name;
use crate::traft::error::Error;
use crate::traft::node::Node as TraftNode;
use crate::traft::op::{Acl as OpAcl, Ddl as OpDdl, Op};
use crate::traft::{self, node};
use crate::util::{duration_from_secs_f64_clamped, effective_user_id};
use crate::{cas, unwrap_ok_or};

use opentelemetry::sdk::trace::Tracer;
use opentelemetry::{baggage::BaggageExt, Context, KeyValue};
use sbroad::backend::sql::ir::{EncodedPatternWithParams, PatternWithParams};
use sbroad::debug;
use sbroad::errors::{Action, Entity, SbroadError};
use sbroad::executor::engine::helpers::{decode_msgpack, normalize_name_for_space_api};
use sbroad::executor::engine::{QueryCache, Router, TableVersionMap};
use sbroad::executor::lru::Cache;
use sbroad::executor::protocol::{EncodedRequiredData, RequiredData};
use sbroad::executor::result::ConsumerResult;
use sbroad::executor::Query;
use sbroad::frontend::Ast;
use sbroad::ir::acl::{Acl, AlterOption, GrantRevokeType, Privilege as SqlPrivilege};
use sbroad::ir::block::Block;
use sbroad::ir::ddl::{Ddl, ParamDef};
use sbroad::ir::expression::Expression;
use sbroad::ir::operator::Relational;
use sbroad::ir::relation::Type;
use sbroad::ir::tree::traversal::{PostOrderWithFilter, REL_CAPACITY};
use sbroad::ir::value::{LuaValue, Value};
use sbroad::ir::{Node as IrNode, Plan as IrPlan};
use sbroad::otm::{query_id, query_span, OTM_CHAR_LIMIT};
use serde::Deserialize;
use tarantool::access_control::{box_access_check_ddl, SchemaObjectType as TntSchemaObjectType};
use tarantool::schema::function::func_next_reserved_id;

use self::pgproto::{ClientId, Oid};
use crate::storage::Clusterwide;
use ::tarantool::access_control::{box_access_check_space, PrivType};
use ::tarantool::auth::{AuthData, AuthDef, AuthMethod};
use ::tarantool::proc;
use ::tarantool::session::{with_su, UserId};
use ::tarantool::space::{FieldType, Space, SpaceId, SystemSpace};
use ::tarantool::time::Instant;
use ::tarantool::tuple::{RawBytes, Tuple};
use std::rc::Rc;
use std::str::FromStr;
use tarantool::session;

pub mod otm;

pub mod pgproto;
pub mod router;
pub mod storage;
use otm::TracerKind;

pub const DEFAULT_BUCKET_COUNT: u64 = 3000;
const SPECTIAL_CHARACTERS: [char; 6] = ['&', '|', '?', '!', '$', '@'];

enum Privileges {
    Read,
    Write,
    ReadWrite,
}

fn check_table_privileges(plan: &IrPlan) -> traft::Result<()> {
    let filter = |node_id: usize| -> bool {
        if let Ok(IrNode::Relational(
            Relational::ScanRelation { .. }
            | Relational::Delete { .. }
            | Relational::Insert { .. }
            | Relational::Update { .. },
        )) = plan.get_node(node_id)
        {
            return true;
        }
        false
    };
    let mut plan_traversal = PostOrderWithFilter::with_capacity(
        |node| plan.subtree_iter(node, false),
        REL_CAPACITY,
        Box::new(filter),
    );
    let top_id = plan.get_top().map_err(Error::from)?;
    plan_traversal.populate_nodes(top_id);
    let nodes = plan_traversal.take_nodes();

    // We don't want to switch the user back and forth for each node, so we
    // collect all space ids and privileges and then check them all at once.
    let mut space_privs: Vec<(SpaceId, Privileges)> = Vec::with_capacity(nodes.len());

    // Switch to admin to get space ids. At the moment we don't use space cache in tarantool
    // module and can't get space metadata without _space table read permissions.
    with_su(ADMIN_ID, || -> traft::Result<()> {
        for (_, node_id) in nodes {
            let rel_node = plan.get_relation_node(node_id).map_err(Error::from)?;
            let (relation, privileges) = match rel_node {
                Relational::ScanRelation { relation, .. } => (relation, Privileges::Read),
                Relational::Insert { relation, .. } => (relation, Privileges::Write),
                Relational::Delete { relation, .. } | Relational::Update { relation, .. } => {
                    // We check write and read privileges for deletes and updates.
                    //
                    // Write: Picodata doesn't support delete and update privileges,
                    // so we grant write access instead.
                    //
                    // Read: SQL standard says that update and delete statements
                    // should check for read access when they contain a where
                    // clause (to protect from information leaks). But we don't
                    // expect that updates and deletes would be used without a
                    // where clause (long operations are not good for Picodata).
                    // So, let's make it simple and avoid special cases.
                    (relation, Privileges::ReadWrite)
                }
                // This should never happen as we have filtered out all other plan nodes.
                _ => unreachable!("internal bug on the table privilege check"),
            };
            let space_name = normalize_name_for_space_api(relation);
            let space = space_by_name(&space_name).map_err(Error::from)?;
            space_privs.push((space.id(), privileges))
        }
        Ok(())
    })??;
    for (space_id, priviledges) in space_privs {
        match priviledges {
            Privileges::Read => {
                box_access_check_space(space_id, PrivType::Read).map_err(Error::from)?;
            }
            Privileges::Write => {
                box_access_check_space(space_id, PrivType::Write).map_err(Error::from)?;
            }
            Privileges::ReadWrite => {
                box_access_check_space(space_id, PrivType::Read).map_err(Error::from)?;
                box_access_check_space(space_id, PrivType::Write).map_err(Error::from)?;
            }
        }
    }
    Ok(())
}

fn routine_by_name(name: &str) -> traft::Result<RoutineDef> {
    // Switch to admin to get procedure definition.
    with_su(ADMIN_ID, || -> traft::Result<RoutineDef> {
        let storage = &node::global()?.storage;
        let routine = storage
            .routines
            .by_name(name)
            .map_err(Error::from)?
            .ok_or_else(|| {
                Error::Sbroad(SbroadError::Invalid(
                    Entity::Routine,
                    Some(format!("routine {name} not found")),
                ))
            })?;
        Ok(routine)
    })?
}

fn check_routine_privileges(plan: &IrPlan) -> traft::Result<()> {
    // At the moment we don't support nested procedure calls, so we can safely
    // assume that the top node is the only procedure in the plan.
    let top_id = plan.get_top().map_err(Error::from)?;
    let Ok(Block::Procedure { name, .. }) = plan.get_block_node(top_id) else {
        // There are no procedures in the plan tree: nothing to check.
        return Ok(());
    };

    let routine = routine_by_name(name)?;
    box_access_check_ddl(
        name,
        routine.id,
        routine.owner,
        TntSchemaObjectType::Function,
        PrivType::Execute,
    )?;
    Ok(())
}

fn dispatch(mut query: Query<RouterRuntime>) -> traft::Result<Tuple> {
    if query.is_ddl().map_err(Error::from)? || query.is_acl().map_err(Error::from)? {
        let ir_plan = query.get_exec_plan().get_ir_plan();
        let top_id = ir_plan.get_top().map_err(Error::from)?;
        let ir_plan_mut = query.get_mut_exec_plan().get_mut_ir_plan();

        // XXX: add Node::take_node method to simplify the following 2 lines
        let ir_node = ir_plan_mut.get_mut_node(top_id).map_err(Error::from)?;
        let ir_node = std::mem::replace(ir_node, IrNode::Parameter);
        let node = node::global()?;
        let result = reenterable_schema_change_request(node, ir_node)?;
        Tuple::new(&(result,)).map_err(Error::from)
    } else if query.is_block().map_err(Error::from)? {
        check_routine_privileges(query.get_exec_plan().get_ir_plan())?;
        let ir_plan = query.get_mut_exec_plan().get_mut_ir_plan();
        let top_id = ir_plan.get_top().map_err(Error::from)?;
        let code_block = ir_plan.get_mut_block_node(top_id).map_err(Error::from)?;
        let code_block = std::mem::take(code_block);
        match code_block {
            Block::Procedure { name, values } => {
                let routine = routine_by_name(&name)?;
                // Check that the amount of passed values is correct.
                if routine.params.len() != values.len() {
                    return Err(Error::Sbroad(SbroadError::Invalid(
                        Entity::Routine,
                        Some(format!(
                            "expected {} parameter(s), got {}",
                            routine.params.len(),
                            values.len(),
                        )),
                    )));
                }
                // XXX: at the moment we don't support multiple SQL statements in a block.
                // So, we can safely assume that the procedure body contains only one statement
                // and call it directly.
                let pattern = routine.body;
                let mut params: Vec<Value> = Vec::with_capacity(values.len());
                for (pos, value_id) in values.into_iter().enumerate() {
                    let constant_node = ir_plan.get_mut_node(value_id).map_err(Error::from)?;
                    let constant_node = std::mem::replace(constant_node, IrNode::Parameter);
                    let value = match constant_node {
                        IrNode::Expression(Expression::Constant { value, .. }) => value,
                        _ => {
                            return Err(Error::Sbroad(SbroadError::Invalid(
                                Entity::Expression,
                                Some(format!("expected constant, got {constant_node:?}")),
                            )))
                        }
                    };
                    // We have already checked the amount of passed values, so we can
                    // safely assume that the parameter exists at the given position.
                    let param_def = &routine.params[pos];
                    let param_type = Type::try_from(param_def.r#type).map_err(Error::from)?;
                    // Check that the value has a correct type.
                    if !value.get_type().is_castable_to(&param_type) {
                        return Err(Error::Sbroad(SbroadError::Invalid(
                            Entity::Routine,
                            Some(format!(
                                "expected {} for parameter on position {pos}, got {}",
                                param_def.r#type,
                                value.get_type(),
                            )),
                        )));
                    }
                    params.push(value);
                }
                let runtime = RouterRuntime::new().map_err(Error::from)?;
                let mut stmt_query =
                    with_su(ADMIN_ID, || -> traft::Result<Query<RouterRuntime>> {
                        Query::new(&runtime, &pattern, params).map_err(Error::from)
                    })??;
                // Take options from the original query.
                let options = std::mem::take(&mut ir_plan.raw_options);
                let stmt_ir_plan = stmt_query.get_mut_exec_plan().get_mut_ir_plan();
                stmt_ir_plan.raw_options = options;
                dispatch(stmt_query)
            }
        }
    } else {
        let plan = query.get_exec_plan().get_ir_plan();
        check_table_privileges(plan)?;
        match query.dispatch() {
            Ok(mut any_tuple) => {
                if let Some(tuple) = any_tuple.downcast_mut::<Tuple>() {
                    debug!(
                        Option::from("dispatch"),
                        &format!("Dispatch result: {tuple:?}"),
                    );
                    let tuple: Tuple = std::mem::replace(tuple, Tuple::new(&())?);
                    Ok(tuple)
                } else {
                    Err(Error::from(SbroadError::FailedTo(
                        Action::Decode,
                        None,
                        format!("tuple {any_tuple:?}"),
                    )))
                }
            }
            Err(e) => Err(Error::from(e)),
        }
    }
}

#[inline]
pub fn with_tracer(ctx: Context, tracer_kind: TracerKind) -> Context {
    ctx.with_baggage(vec![KeyValue::new(TRACER_KEY, tracer_kind.to_string())])
}

/// Dispatches a query to the cluster.
#[proc(packed_args)]
pub fn dispatch_query(encoded_params: EncodedPatternWithParams) -> traft::Result<Tuple> {
    let mut params = PatternWithParams::try_from(encoded_params).map_err(Error::from)?;
    let id = params.clone_id();
    let mut ctx = params.extract_context();
    let mut tracer_kind = TracerKind::default();

    if let Some(tracer_kind_str) = params.tracer.as_ref() {
        tracer_kind =
            TracerKind::from_str(tracer_kind_str).map_err(|e| Error::Other(Box::new(e)))?;
        ctx = with_tracer(ctx, tracer_kind);
    }

    let dispatch = || {
        let runtime = RouterRuntime::new().map_err(Error::from)?;
        let build_query =
            || Query::new(&runtime, &params.pattern, params.params).map_err(Error::from);

        let query = with_su(ADMIN_ID, || -> traft::Result<Query<RouterRuntime>> {
            build_query()
        })??;
        dispatch(query)
    };

    query_span::<Result<Tuple, Error>, _>(
        "\"api.router.dispatch\"",
        &id,
        tracer_kind.get_tracer(),
        &ctx,
        &params.pattern,
        dispatch,
    )
}

struct BindArgs {
    id: ClientId,
    stmt_name: String,
    portal_name: String,
    params: Vec<Value>,
    encoding_format: Vec<u8>,
    traceable: bool,
}

impl<'de> Deserialize<'de> for BindArgs {
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: serde::Deserializer<'de>,
    {
        #[derive(Deserialize)]
        struct EncodedBindArgs(
            ClientId,
            String,
            String,
            Option<Vec<LuaValue>>,
            Vec<u8>,
            Option<bool>,
        );

        let EncodedBindArgs(id, stmt_name, portal_name, params, encoding_format, traceable) =
            EncodedBindArgs::deserialize(deserializer)?;

        let params = params
            .unwrap_or_default()
            .into_iter()
            .map(Value::from)
            .collect::<Vec<Value>>();

        Ok(Self {
            id,
            stmt_name,
            portal_name,
            params,
            encoding_format,
            traceable: traceable.unwrap_or(false),
        })
    }
}

// helper function to get `TracerRef`
fn get_tracer_param(traceable: bool) -> &'static Tracer {
    let kind = TracerKind::from_traceable(traceable);
    kind.get_tracer()
}

#[proc(packed_args)]
pub fn proc_pg_bind(args: BindArgs) -> traft::Result<()> {
    let BindArgs {
        id,
        stmt_name,
        portal_name,
        params,
        encoding_format: output_format,
        traceable,
    } = args;
    let key = (id, stmt_name.into());
    let Some(statement) = PG_STATEMENTS.with(|storage| storage.borrow().get(&key)) else {
        return Err(Error::Other(
            format!("Couldn't find statement \'{}\'.", key.1).into(),
        ));
    };
    let mut plan = statement.plan().clone();
    let ctx = with_tracer(Context::new(), TracerKind::from_traceable(traceable));
    let portal = query_span::<traft::Result<_>, _>(
        "\"api.router.bind\"",
        statement.id(),
        get_tracer_param(traceable),
        &ctx,
        statement.query_pattern(),
        || {
            if !plan.is_ddl()? && !plan.is_acl()? {
                plan.bind_params(params)?;
                plan.apply_options()?;
                plan.optimize()?;
            }
            Portal::new(plan, statement.clone(), output_format)
        },
    )?;

    PG_PORTALS.with(|storage| storage.borrow_mut().put((id, portal_name.into()), portal))?;
    Ok(())
}
#[proc]
pub fn proc_pg_statements(id: ClientId) -> UserStatementNames {
    UserStatementNames::new(id)
}

#[proc]
pub fn proc_pg_portals(id: ClientId) -> UserPortalNames {
    UserPortalNames::new(id)
}

#[proc]
pub fn proc_pg_close_stmt(id: ClientId, name: String) {
    // Close can't cause an error in PG.
    PG_STATEMENTS.with(|storage| storage.borrow_mut().remove(&(id, name.into())));
}

#[proc]
pub fn proc_pg_close_portal(id: ClientId, name: String) {
    // Close can't cause an error in PG.
    PG_PORTALS.with(|storage| storage.borrow_mut().remove(&(id, name.into())));
}

#[proc]
pub fn proc_pg_close_client_stmts(id: ClientId) {
    PG_STATEMENTS.with(|storage| storage.borrow_mut().remove_by_client_id(id))
}

#[proc]
pub fn proc_pg_close_client_portals(id: ClientId) {
    PG_PORTALS.with(|storage| storage.borrow_mut().remove_by_client_id(id))
}

#[proc]
pub fn proc_pg_describe_stmt(id: ClientId, name: String) -> Result<StatementDescribe, Error> {
    let key = (id, name.into());
    let Some(statement) = PG_STATEMENTS.with(|storage| storage.borrow().get(&key)) else {
        return Err(Error::Other(
            format!("Couldn't find statement \'{}\'.", key.1).into(),
        ));
    };
    Ok(statement.describe().clone())
}

#[proc]
pub fn proc_pg_describe_portal(id: ClientId, name: String) -> traft::Result<PortalDescribe> {
    with_portals_mut((id, name.into()), |portal| Ok(portal.describe().clone()))
}

#[proc]
pub fn proc_pg_execute(
    id: ClientId,
    name: String,
    max_rows: i64,
    traceable: bool,
) -> traft::Result<Tuple> {
    let max_rows = if max_rows <= 0 { i64::MAX } else { max_rows };
    let name = Rc::from(name);

    let statement = with_portals_mut((id, Rc::clone(&name)), |portal| {
        // We are cloning Rc here.
        Ok(portal.statement().clone())
    })?;
    with_portals_mut((id, name), |portal| {
        let ctx = with_tracer(Context::new(), TracerKind::from_traceable(traceable));
        query_span::<traft::Result<Tuple>, _>(
            "\"api.router.execute\"",
            statement.id(),
            get_tracer_param(traceable),
            &ctx,
            statement.query_pattern(),
            || portal.execute(max_rows as usize),
        )
    })
}

#[proc]
pub fn proc_pg_parse(
    cid: ClientId,
    name: String,
    query: String,
    param_oids: Vec<Oid>,
    traceable: bool,
) -> traft::Result<()> {
    let id = query_id(&query);
    // Keep the query patterns for opentelemetry spans short enough.
    let sql = query
        .char_indices()
        .filter_map(|(i, c)| if i <= OTM_CHAR_LIMIT { Some(c) } else { None })
        .collect::<String>();
    let ctx = with_tracer(Context::new(), TracerKind::from_traceable(traceable));
    query_span::<traft::Result<()>, _>(
        "\"api.router.parse\"",
        &id.clone(),
        get_tracer_param(traceable),
        &ctx,
        &sql.clone(),
        || {
            let runtime = RouterRuntime::new().map_err(Error::from)?;
            let mut cache = runtime
                .cache()
                .try_borrow_mut()
                .map_err(|e| Error::Other(format!("runtime query cache: {e:?}").into()))?;
            if let Some(plan) = cache.get(&query)? {
                let statement = Statement::new(id, sql.clone(), plan.clone(), param_oids)?;
                PG_STATEMENTS
                    .with(|cache| cache.borrow_mut().put((cid, name.into()), statement))?;
                return Ok(());
            }
            let metadata = &*runtime.metadata().map_err(Error::from)?;
            let plan = with_su(ADMIN_ID, || -> traft::Result<IrPlan> {
                let mut plan =
                    <RouterRuntime as Router>::ParseTree::transform_into_plan(&query, metadata)
                        .map_err(Error::from)?;
                if runtime.provides_versions() {
                    let mut table_version_map =
                        TableVersionMap::with_capacity(plan.relations.tables.len());
                    for table in plan.relations.tables.keys() {
                        let normalized = normalize_name_for_space_api(table);
                        let version = runtime
                            .get_table_version(normalized.as_str())
                            .map_err(Error::from)?;
                        table_version_map.insert(normalized, version);
                    }
                    plan.version_map = table_version_map;
                }
                Ok(plan)
            })??;
            if !plan.is_ddl()? && !plan.is_acl()? {
                cache.put(query, plan.clone())?;
            }
            let statement = Statement::new(id, sql, plan, param_oids)?;
            PG_STATEMENTS
                .with(|storage| storage.borrow_mut().put((cid, name.into()), statement))?;
            Ok(())
        },
    )
}

impl TryFrom<&SqlPrivilege> for PrivilegeType {
    type Error = SbroadError;

    fn try_from(item: &SqlPrivilege) -> Result<Self, Self::Error> {
        match item {
            SqlPrivilege::Read => Ok(PrivilegeType::Read),
            SqlPrivilege::Write => Ok(PrivilegeType::Write),
            SqlPrivilege::Execute => Ok(PrivilegeType::Execute),
            SqlPrivilege::Create => Ok(PrivilegeType::Create),
            SqlPrivilege::Alter => Ok(PrivilegeType::Alter),
            SqlPrivilege::Drop => Ok(PrivilegeType::Drop),

            // Picodata does not allow to grant or revoke session or usage
            // Instead this should be done through alter user with login/nologin
            SqlPrivilege::Session => Err(SbroadError::Unsupported(
                Entity::Privilege,
                Some("session".into()),
            )),
            SqlPrivilege::Usage => Err(SbroadError::Unsupported(
                Entity::Privilege,
                Some("usage".into()),
            )),
        }
    }
}

impl TraftNode {
    /// Helper method to retrieve next id for newly created user/role.
    fn get_next_grantee_id(&self) -> traft::Result<UserId> {
        let storage = &self.storage;
        let max_user_id = storage.users.max_user_id()?;
        if let Some(max_user_id) = max_user_id {
            return Ok(max_user_id + 1);
        }

        let max_tarantool_user_id: UserId = Space::from(SystemSpace::User)
            .index("primary")
            .expect("_user should have a primary index")
            .max(&())?
            .expect("_user must contain at least one row")
            .get(0)
            .expect("_user rows must contain id column");
        Ok(max_tarantool_user_id + 1)
    }

    /// Get table id by its name.
    /// Returns:
    /// * `Some(table_id)`` in case such table exists
    /// * `None` in case such table doesn't exist
    fn get_table_id(&self, table_name: &String) -> Option<u32> {
        let table = Space::from(SystemSpace::Space)
            .index("name")
            .expect("_space should have a name index")
            .get(&(table_name,))
            .expect("name index selection from _space should succeed");
        if let Some(table) = table {
            let table_id = table.get(0).expect("_space rows must contain id column");
            Some(table_id)
        } else {
            None
        }
    }

    /// Get user or role id by its name.
    /// Returns:
    /// * `Some(user_or_role_id)`` in case such user or role exists
    /// * `None` in case such user or role doesn't exist
    fn get_user_or_role_id(&self, user_or_role_name: &String) -> Option<UserId> {
        let user_or_role = Space::from(SystemSpace::VUser)
            .index("name")
            .expect("_vuser should have a name index")
            .get(&(user_or_role_name,))
            .expect("name index selection from _vuser should succeed");
        if let Some(user_or_role) = user_or_role {
            let user_or_role_id = user_or_role
                .get(0)
                .expect("_vuser rows must contain id column");
            Some(user_or_role_id)
        } else {
            None
        }
    }

    /// Get (object_type, privilege_type, object_id) data from `GrantRevokeType`.
    fn object_resolve(
        &self,
        grant_revoke_type: &GrantRevokeType,
    ) -> traft::Result<(SchemaObjectType, PrivilegeType, i64)> {
        match grant_revoke_type {
            GrantRevokeType::User { privilege } => {
                Ok((SchemaObjectType::User, privilege.try_into()?, -1))
            }
            GrantRevokeType::SpecificUser {
                privilege,
                user_name,
            } => {
                if let Some(user_id) = self.get_user_or_role_id(user_name) {
                    Ok((
                        SchemaObjectType::User,
                        privilege.try_into()?,
                        user_id as i64,
                    ))
                } else {
                    Err(Error::Sbroad(SbroadError::Invalid(
                        Entity::Acl,
                        Some(format!("There is no user with name {user_name}")),
                    )))
                }
            }
            GrantRevokeType::Role { privilege } => {
                Ok((SchemaObjectType::Role, privilege.try_into()?, -1))
            }
            GrantRevokeType::SpecificRole {
                privilege,
                role_name,
            } => {
                if let Some(role_id) = self.get_user_or_role_id(role_name) {
                    Ok((
                        SchemaObjectType::Role,
                        privilege.try_into()?,
                        role_id as i64,
                    ))
                } else {
                    Err(Error::Sbroad(SbroadError::Invalid(
                        Entity::Acl,
                        Some(format!("There is no role with name {role_name}")),
                    )))
                }
            }
            GrantRevokeType::Table { privilege } => {
                Ok((SchemaObjectType::Table, privilege.try_into()?, -1))
            }
            GrantRevokeType::SpecificTable {
                privilege,
                table_name,
            } => {
                if let Some(table_id) = self.get_table_id(table_name) {
                    Ok((
                        SchemaObjectType::Table,
                        privilege.try_into()?,
                        table_id as i64,
                    ))
                } else {
                    Err(Error::Sbroad(SbroadError::Invalid(
                        Entity::Acl,
                        Some(format!("There is no table with name {table_name}")),
                    )))
                }
            }
            GrantRevokeType::Procedure { privilege } => {
                Ok((SchemaObjectType::Routine, privilege.try_into()?, -1))
            }
            GrantRevokeType::SpecificProcedure {
                privilege,
                proc_name,
                proc_params,
            } => {
                if let Some(routine) = self.storage.routines.by_name(proc_name)? {
                    if let Some(params) = proc_params.as_ref() {
                        ensure_parameters_match(&routine, params)?;
                    }
                    Ok((
                        SchemaObjectType::Routine,
                        privilege.try_into()?,
                        routine.id as i64,
                    ))
                } else {
                    Err(Error::Sbroad(SbroadError::Invalid(
                        Entity::Acl,
                        Some(format!("There is no routine with name {proc_name}")),
                    )))
                }
            }
            GrantRevokeType::RolePass { role_name } => {
                if let Some(role_id) = self.get_user_or_role_id(role_name) {
                    Ok((
                        SchemaObjectType::Role,
                        PrivilegeType::Execute,
                        role_id as i64,
                    ))
                } else {
                    Err(Error::Sbroad(SbroadError::Invalid(
                        Entity::Acl,
                        Some(format!("There is no role with name {role_name}")),
                    )))
                }
            }
        }
    }
}

fn validate_password(
    password: &str,
    auth_method: &AuthMethod,
    node: &TraftNode,
) -> traft::Result<()> {
    if let AuthMethod::Ldap = auth_method {
        // LDAP doesn't need password for authentication
        return Ok(());
    }

    let storage = &node.storage;

    // This check is called from user facing API.
    // A user is not expected to have access to _pico_property
    let password_min_length =
        session::with_su(ADMIN_ID, || storage.properties.password_min_length())??;
    if password.len() < password_min_length {
        return Err(Error::Other(
            format!(
                "password is too short: expected at least {}, got {}",
                password_min_length,
                password.len()
            )
            .into(),
        ));
    }

    let password_enforce_uppercase =
        session::with_su(ADMIN_ID, || storage.properties.password_enforce_uppercase())??;
    if password_enforce_uppercase && !password.chars().any(|ch| ch.is_uppercase()) {
        return Err(Error::Other(
            "invalid password: password should contains at least one uppercase letter".into(),
        ));
    }

    let password_enforce_lowercase =
        session::with_su(ADMIN_ID, || storage.properties.password_enforce_lowercase())??;
    if password_enforce_lowercase && !password.chars().any(|ch| ch.is_lowercase()) {
        return Err(Error::Other(
            "invalid password: password should contains at least one lowercase letter".into(),
        ));
    }

    let password_enforce_digits =
        session::with_su(ADMIN_ID, || storage.properties.password_enforce_digits())??;
    if password_enforce_digits && !password.chars().any(|ch| ch.is_ascii_digit()) {
        return Err(Error::Other(
            "invalid password: password should contains at least one digit".into(),
        ));
    }

    let password_enforce_specialchars = session::with_su(ADMIN_ID, || {
        storage.properties.password_enforce_specialchars()
    })??;
    if password_enforce_specialchars
        && !password.chars().any(|ch| SPECTIAL_CHARACTERS.contains(&ch))
    {
        return Err(Error::Other(
            format!(
                "invalid password: password should contains at least one special character - {:?}",
                SPECTIAL_CHARACTERS
            )
            .into(),
        ));
    }

    Ok(())
}

/// Get grantee (user or role) UserId by its name.
fn get_grantee_id(storage: &Clusterwide, grantee_name: &String) -> traft::Result<UserId> {
    if let Some(grantee_user_def) = storage.users.by_name(grantee_name)? {
        Ok(grantee_user_def.id)
    } else {
        // No existing user or role found.
        Err(Error::Sbroad(SbroadError::Invalid(
            Entity::Acl,
            Some(format!(
                "Nor user, neither role with name {grantee_name} exists"
            )),
        )))
    }
}

/// Find whether given privilege was already granted.
fn check_privilege_already_granted(
    node: &TraftNode,
    grantee_id: UserId,
    object_type: &str,
    object_id: i64,
    privilege: &str,
) -> traft::Result<bool> {
    let storage = &node.storage;
    Ok(storage
        .privileges
        .get(grantee_id, object_type, object_id, privilege)?
        .is_some())
}

fn ensure_parameters_match(routine: &RoutineDef, params: &[ParamDef]) -> traft::Result<()> {
    if routine.params.len() == params.len() {
        let parameters_matched = routine
            .params
            .iter()
            .zip(params)
            .all(|(param_def, param)| param_def.r#type == FieldType::from(&param.data_type));

        if parameters_matched {
            return Ok(());
        }
    };

    let actual_signature = format!(
        "{}({})",
        routine.name,
        routine
            .params
            .iter()
            .map(|def| def.r#type.as_str())
            .collect::<Vec<_>>()
            .join(", ")
    );

    Err(Error::Other(
        format!(
            "routine exists but with a different signature: {}",
            actual_signature
        )
        .into(),
    ))
}

fn reenterable_schema_change_request(
    node: &TraftNode,
    ir_node: IrNode,
) -> traft::Result<ConsumerResult> {
    let storage = &node.storage;
    // Save current user as later user is switched to admin
    let current_user = effective_user_id();

    let timeout = match &ir_node {
        IrNode::Ddl(ddl) => ddl.timeout()?,
        IrNode::Acl(acl) => acl.timeout()?,
        n => {
            unreachable!("this function should only be called for ddl or acl nodes, not {n:?}")
        }
    };
    let timeout = duration_from_secs_f64_clamped(timeout);
    let deadline = Instant::now().saturating_add(timeout);

    // Check parameters
    let params = match ir_node {
        IrNode::Ddl(Ddl::CreateProc {
            name,
            params: args,
            body,
            language,
            ..
        }) => {
            let args: RoutineParams = args
                .into_iter()
                .map(|p| {
                    let field_type = FieldType::from(&p.data_type);
                    RoutineParamDef::default().with_type(field_type)
                })
                .collect();
            let language = RoutineLanguage::from(language);
            let security = RoutineSecurity::default();

            let params = CreateProcParams {
                name,
                params: args,
                language,
                body,
                security,
                owner: current_user,
            };
            params.validate(storage)?;
            Params::CreateProcedure(params)
        }

        IrNode::Ddl(Ddl::CreateTable {
            name,
            format,
            primary_key,
            sharding_key,
            engine_type,
            ..
        }) => {
            let format = format
                .into_iter()
                .map(|f| Field {
                    name: f.name,
                    r#type: FieldType::from(&f.data_type),
                    is_nullable: f.is_nullable,
                })
                .collect();
            let distribution = if sharding_key.is_some() {
                DistributionParam::Sharded
            } else {
                DistributionParam::Global
            };
            let params = CreateTableParams {
                id: None,
                name,
                format,
                primary_key,
                distribution,
                by_field: None,
                sharding_key,
                sharding_fn: Some(ShardingFn::Murmur3),
                engine: Some(engine_type),
                timeout: None,
                owner: current_user,
            };
            params.validate()?;
            Params::CreateTable(params)
        }
        IrNode::Ddl(Ddl::DropProc { name, params, .. }) => {
            // Nothing to check
            Params::DropProcedure(name, params)
        }
        IrNode::Ddl(Ddl::DropTable { name, .. }) => {
            // Nothing to check
            Params::DropTable(name)
        }
        IrNode::Ddl(Ddl::RenameRoutine {
            old_name,
            new_name,
            params,
            ..
        }) => {
            let params = RenameRoutineParams {
                new_name,
                old_name,
                params,
            };
            Params::RenameRoutine(params)
        }
        IrNode::Acl(Acl::DropUser { name, .. }) => {
            // Nothing to check
            Params::DropUser(name)
        }
        IrNode::Acl(Acl::CreateRole { name, .. }) => {
            // Nothing to check
            Params::CreateRole(name)
        }
        IrNode::Acl(Acl::DropRole { name, .. }) => {
            // Nothing to check
            Params::DropRole(name)
        }
        IrNode::Acl(Acl::CreateUser {
            name,
            password,
            auth_method,
            ..
        }) => {
            let method = AuthMethod::from_str(&auth_method)
                .map_err(|_| Error::Other(format!("Unknown auth method: {auth_method}").into()))?;
            validate_password(&password, &method, node)?;
            let data = AuthData::new(&method, &name, &password);
            let auth = AuthDef::new(method, data.into_string());
            Params::CreateUser(name, auth)
        }
        IrNode::Acl(Acl::AlterUser {
            name, alter_option, ..
        }) => {
            let alter_option_param = match alter_option {
                AlterOption::Password {
                    password,
                    auth_method,
                } => {
                    let method = AuthMethod::from_str(&auth_method).map_err(|_| {
                        Error::Other(format!("Unknown auth method: {auth_method}").into())
                    })?;
                    validate_password(&password, &method, node)?;
                    let data = AuthData::new(&method, &name, &password);
                    let auth = AuthDef::new(method, data.into_string());
                    AlterOptionParam::ChangePassword(auth)
                }
                AlterOption::Login => AlterOptionParam::Login,
                AlterOption::NoLogin => AlterOptionParam::NoLogin,
                AlterOption::Rename { new_name } => AlterOptionParam::Rename(new_name),
            };
            Params::AlterUser(name, alter_option_param)
        }
        IrNode::Acl(Acl::GrantPrivilege {
            grant_type,
            grantee_name,
            ..
        }) => {
            // Nothing to check
            Params::GrantPrivilege(grant_type, grantee_name)
        }
        IrNode::Acl(Acl::RevokePrivilege {
            revoke_type,
            grantee_name,
            ..
        }) => {
            // Nothing to check
            Params::RevokePrivilege(revoke_type, grantee_name)
        }
        n => {
            unreachable!("this function should only be called for ddl or acl nodes, not {n:?}")
        }
    };

    let _su = session::su(ADMIN_ID).expect("cant fail because admin should always have session");

    'retry: loop {
        if Instant::now() > deadline {
            return Err(Error::Timeout);
        }

        let index = node.read_index(deadline.duration_since(Instant::now()))?;

        if storage.properties.pending_schema_change()?.is_some() {
            node.wait_index(index + 1, deadline.duration_since(Instant::now()))?;
            continue 'retry;
        }

        let schema_version = storage.properties.next_schema_version()?;

        // Check for conflicts and make the op
        let op = match &params {
            Params::CreateProcedure(params) => {
                if params.func_exists() {
                    // Function already exists, no op needed.
                    return Ok(ConsumerResult { row_count: 0 });
                }
                let id = func_next_reserved_id()?;
                let ddl = OpDdl::CreateProcedure {
                    id,
                    name: params.name.clone(),
                    params: params.params.clone(),
                    language: params.language.clone(),
                    body: params.body.clone(),
                    security: params.security.clone(),
                    owner: params.owner,
                };
                Op::DdlPrepare {
                    schema_version,
                    ddl,
                }
            }
            Params::DropProcedure(name, params) => {
                let Some(routine) = &storage.routines.by_name(name)? else {
                    // Procedure doesn't exist yet, no op needed
                    return Ok(ConsumerResult { row_count: 0 });
                };

                // drop by name if no parameters are specified
                if let Some(params) = params {
                    ensure_parameters_match(routine, params)?;
                }

                let ddl = OpDdl::DropProcedure {
                    id: routine.id,
                    initiator: current_user,
                };
                Op::DdlPrepare {
                    schema_version,
                    ddl,
                }
            }
            Params::RenameRoutine(params) => {
                if !params.func_exists() {
                    // Procedure does not exist, nothing to rename
                    return Ok(ConsumerResult { row_count: 0 });
                }
                if params.new_name_occupied() {
                    return Err(Error::Other(
                        format!("Name '{}' is already taken", params.new_name).into(),
                    ));
                }

                let routine_def = node
                    .storage
                    .routines
                    .by_name(&params.old_name)?
                    .expect("if routine ddl is correct, routine must exist");

                if let Some(params) = params.params.as_ref() {
                    ensure_parameters_match(&routine_def, params)?;
                }

                let ddl = OpDdl::RenameProcedure {
                    routine_id: routine_def.id,
                    new_name: params.new_name.clone(),
                    old_name: params.old_name.clone(),
                    initiator_id: current_user,
                    owner_id: routine_def.owner,
                    schema_version,
                };

                Op::DdlPrepare {
                    ddl,
                    schema_version,
                }
            }
            Params::CreateTable(params) => {
                if params.space_exists()? {
                    // Space already exists, no op needed
                    return Ok(ConsumerResult { row_count: 0 });
                }
                // XXX: this is stupid, we pass raft op by value everywhere even
                // though it's always just dropped right after serialization.
                // This forces us to clone it quite often. The root problem is
                // that we nest structs a lot and having references to structs
                // in other structs (which is what we should be doing) is very
                // painfull in rust.
                let mut params = params.clone();
                params.choose_id_if_not_specified()?;
                params.test_create_space(storage)?;
                let ddl = params.into_ddl()?;
                Op::DdlPrepare {
                    schema_version,
                    ddl,
                }
            }
            Params::DropTable(name) => {
                let Some(space_def) = storage.tables.by_name(name)? else {
                    // Space doesn't exist yet, no op needed
                    return Ok(ConsumerResult { row_count: 0 });
                };
                let ddl = OpDdl::DropTable {
                    id: space_def.id,
                    initiator: current_user,
                };
                Op::DdlPrepare {
                    schema_version,
                    ddl,
                }
            }
            Params::CreateUser(name, auth) => {
                let user_def = storage.users.by_name(name)?;
                match user_def {
                    Some(user_def) if user_def.is_role() => {
                        return Err(Error::Other(format!("Role {name} already exists").into()));
                    }
                    Some(user_def) => {
                        if user_def
                            .auth
                            .expect("user always should have non empty auth")
                            != *auth
                        {
                            return Err(Error::Other(
                                format!("User {name} already exists with different auth method")
                                    .into(),
                            ));
                        }
                        // User already exists, no op needed
                        return Ok(ConsumerResult { row_count: 0 });
                    }
                    None => {
                        let id = node.get_next_grantee_id()?;
                        let user_def = UserDef {
                            id,
                            name: name.clone(),
                            schema_version,
                            auth: Some(auth.clone()),
                            owner: current_user,
                            ty: UserMetadataKind::User,
                        };
                        Op::Acl(OpAcl::CreateUser { user_def })
                    }
                }
            }
            Params::AlterUser(name, alter_option_param) => {
                let user_def = storage.users.by_name(name)?;
                let user_def = match user_def {
                    // Unable to alter role
                    Some(user_def) if user_def.is_role() => {
                        return Err(Error::Other(
                            format!("Role {name} exists. Unable to alter role.").into(),
                        ));
                    }
                    // User doesn't exists, no op needed.
                    None => return Ok(ConsumerResult { row_count: 0 }),
                    Some(user_def) => user_def,
                };

                // For ALTER Login/NoLogin.
                let grantor_id = session::euid()?;
                let grantee_id = get_grantee_id(storage, name)?;
                let object_type = SchemaObjectType::Universe;
                let object_id = 0;
                let privilege = PrivilegeType::Login;
                let priv_def = PrivilegeDef::new(
                    privilege,
                    object_type,
                    object_id,
                    grantee_id,
                    grantor_id,
                    schema_version,
                )
                .map_err(Error::other)?;

                match alter_option_param {
                    AlterOptionParam::ChangePassword(auth) => {
                        if user_def
                            .auth
                            .expect("user always should have non empty auth")
                            == *auth
                        {
                            // Password is already the one given, no op needed.
                            return Ok(ConsumerResult { row_count: 0 });
                        }
                        Op::Acl(OpAcl::ChangeAuth {
                            user_id: user_def.id,
                            auth: auth.clone(),
                            initiator: current_user,
                            schema_version,
                        })
                    }
                    AlterOptionParam::Login => {
                        // It will be checked at a later stage whether login is already granted
                        Op::Acl(OpAcl::GrantPrivilege { priv_def })
                    }
                    AlterOptionParam::NoLogin => {
                        // It will be checked at a later stage whether login was not granted
                        Op::Acl(OpAcl::RevokePrivilege {
                            priv_def,
                            initiator: current_user,
                        })
                    }
                    AlterOptionParam::Rename(new_name) => {
                        if user_def.name == *new_name {
                            // Username is already the one given, no op needed.
                            return Ok(ConsumerResult { row_count: 0 });
                        }
                        let user = storage.users.by_name(new_name)?;
                        match user {
                            Some(_) => {
                                return Err(Error::Other(
                                    format!(r#"User with name "{new_name}" exists. Unable to rename user "{name}"."#).into(),
                                ))
                            }
                            None => Op::Acl(OpAcl::RenameUser {
                                user_id: user_def.id,
                                name: new_name.into(),
                                initiator: current_user,
                                schema_version,
                            }),
                        }
                    }
                }
            }
            Params::DropUser(name) => {
                let Some(user_def) = storage.users.by_name(name)? else {
                    // User doesn't exist yet, no op needed
                    return Ok(ConsumerResult { row_count: 0 });
                };
                if user_def.is_role() {
                    return Err(Error::Sbroad(SbroadError::Invalid(
                        Entity::Acl,
                        Some(format!("Role {name} exists. Unable to drop role.")),
                    )));
                }

                Op::Acl(OpAcl::DropUser {
                    user_id: user_def.id,
                    initiator: current_user,
                    schema_version,
                })
            }
            Params::CreateRole(name) => {
                let sys_user = Space::from(SystemSpace::User)
                    .index("name")
                    .expect("_user should have an index by name")
                    .get(&(name,))?;
                if let Some(user) = sys_user {
                    let entry_type: &str = user.get(3).unwrap();
                    if entry_type == "user" {
                        return Err(Error::Sbroad(SbroadError::Invalid(
                            Entity::Acl,
                            Some(format!("Unable to create role {name}. User with the same name already exists")),
                        )));
                    } else {
                        return Ok(ConsumerResult { row_count: 0 });
                    }
                }
                let id = node.get_next_grantee_id()?;
                let role_def = UserDef {
                    id,
                    name: name.clone(),
                    // This field will be updated later.
                    schema_version,
                    owner: current_user,
                    auth: None,
                    ty: UserMetadataKind::Role,
                };
                Op::Acl(OpAcl::CreateRole { role_def })
            }
            Params::DropRole(name) => {
                let Some(role_def) = storage.users.by_name(name)? else {
                    // Role doesn't exist yet, no op needed
                    return Ok(ConsumerResult { row_count: 0 });
                };
                if !role_def.is_role() {
                    return Err(Error::Sbroad(SbroadError::Invalid(
                        Entity::Acl,
                        Some(format!("User {name} exists. Unable to drop user.")),
                    )));
                }

                Op::Acl(OpAcl::DropRole {
                    role_id: role_def.id,
                    initiator: current_user,
                    schema_version,
                })
            }
            Params::GrantPrivilege(grant_type, grantee_name) => {
                let grantor_id = current_user;
                let grantee_id = get_grantee_id(storage, grantee_name)?;
                let (object_type, privilege, object_id) = node.object_resolve(grant_type)?;

                if check_privilege_already_granted(
                    node,
                    grantee_id,
                    &object_type,
                    object_id,
                    &privilege,
                )? {
                    // Privilege is already granted, no op needed.
                    return Ok(ConsumerResult { row_count: 0 });
                }
                Op::Acl(OpAcl::GrantPrivilege {
                    priv_def: PrivilegeDef::new(
                        privilege,
                        object_type,
                        object_id,
                        grantee_id,
                        grantor_id,
                        schema_version,
                    )
                    .map_err(Error::other)?,
                })
            }
            Params::RevokePrivilege(revoke_type, grantee_name) => {
                let grantor_id = current_user;
                let grantee_id = get_grantee_id(storage, grantee_name)?;
                let (object_type, privilege, object_id) = node.object_resolve(revoke_type)?;

                if !check_privilege_already_granted(
                    node,
                    grantee_id,
                    &object_type,
                    object_id,
                    &privilege,
                )? {
                    // Privilege is not granted yet, no op needed.
                    return Ok(ConsumerResult { row_count: 0 });
                }

                Op::Acl(OpAcl::RevokePrivilege {
                    priv_def: PrivilegeDef::new(
                        privilege,
                        object_type,
                        object_id,
                        grantee_id,
                        grantor_id,
                        schema_version,
                    )
                    .map_err(Error::other)?,
                    initiator: current_user,
                })
            }
        };
        let is_ddl_prepare = matches!(op, Op::DdlPrepare { .. });

        let term = raft::Storage::term(&node.raft_storage, index)?;
        let predicate = cas::Predicate {
            index,
            term,
            ranges: cas::schema_change_ranges().into(),
        };
        // Note: as_user doesnt really serve any purpose for DDL checks
        // It'll change when access control checks will be introduced for DDL
        let res = cas::compare_and_swap(
            op,
            predicate,
            current_user,
            deadline.duration_since(Instant::now()),
        );
        let (index, term) = unwrap_ok_or!(res,
            Err(e) => {
                if e.is_retriable() {
                    continue 'retry;
                } else {
                    return Err(e);
                }
            }
        );

        node.wait_index(index, deadline.duration_since(Instant::now()))?;
        if is_ddl_prepare {
            wait_for_ddl_commit(index, deadline.duration_since(Instant::now()))?;
        }

        if term != raft::Storage::term(&node.raft_storage, index)? {
            // Leader has changed and the entry got rolled back, retry.
            continue 'retry;
        }

        return Ok(ConsumerResult { row_count: 1 });
    }

    enum AlterOptionParam {
        ChangePassword(AuthDef),
        Login,
        NoLogin,
        Rename(String),
    }

    // THOUGHT: should `owner_id` be part of `CreateUser`, `CreateRole` params?
    enum Params {
        CreateTable(CreateTableParams),
        DropTable(String),
        CreateUser(String, AuthDef),
        AlterUser(String, AlterOptionParam),
        DropUser(String),
        CreateRole(String),
        DropRole(String),
        GrantPrivilege(GrantRevokeType, String),
        RevokePrivilege(GrantRevokeType, String),
        RenameRoutine(RenameRoutineParams),
        CreateProcedure(CreateProcParams),
        DropProcedure(String, Option<Vec<ParamDef>>),
    }
}

const TRACER_KEY: &str = "Tracer";

/// Executes a query sub-plan on the local node.
#[proc(packed_args)]
pub fn execute(raw: &RawBytes) -> traft::Result<Tuple> {
    let (raw_required, mut raw_optional) = decode_msgpack(raw)?;

    let mut required = RequiredData::try_from(EncodedRequiredData::from(raw_required))?;

    let tracing_meta = std::mem::take(&mut required.tracing_meta);
    let mut exec = || {
        let runtime = StorageRuntime::new().map_err(Error::from)?;
        match runtime.execute_plan(&mut required, &mut raw_optional) {
            Ok(mut any_tuple) => {
                if let Some(tuple) = any_tuple.downcast_mut::<Tuple>() {
                    debug!(
                        Option::from("execute"),
                        &format!("Execution result: {tuple:?}"),
                    );
                    let tuple: Tuple = std::mem::replace(tuple, Tuple::new(&())?);
                    Ok(tuple)
                } else {
                    Err(Error::from(SbroadError::FailedTo(
                        Action::Decode,
                        None,
                        format!("tuple {any_tuple:?}"),
                    )))
                }
            }
            Err(e) => Err(Error::from(e)),
        }
    };

    if let Some(mut meta) = tracing_meta {
        let ctx: Context = (&mut meta.context).into();
        let tracer_kind = ctx.baggage().get(TRACER_KEY);
        let kind = if let Some(value) = tracer_kind {
            TracerKind::from_str(&value.as_str()).map_err(|_| {
                Error::from(SbroadError::Invalid(
                    Entity::RequiredData,
                    Some(format!("unknown tracer: {}", value.as_str())),
                ))
            })?
        } else {
            return Err(Error::from(SbroadError::Invalid(
                Entity::RequiredData,
                Some("no tracer in context".into()),
            )));
        };

        let tracer = kind.get_tracer();
        query_span::<Result<Tuple, Error>, _>(
            "\"api.storage.execute\"",
            &meta.trace_id,
            tracer,
            &ctx,
            "",
            exec,
        )
    } else {
        exec()
    }
}