use sbroad::ir::ddl::Language;
use std::borrow::Cow;
use std::collections::{BTreeMap, HashSet};
use std::fmt::Display;
use std::time::Duration;

use tarantool::auth::AuthData;
use tarantool::auth::AuthDef;
use tarantool::auth::AuthMethod;
use tarantool::error::TarantoolError;
use tarantool::error::TarantoolErrorCode;
use tarantool::fiber;
use tarantool::session::UserId;
use tarantool::set_error;
use tarantool::space::{FieldType, SpaceCreateOptions, SpaceEngineType};
use tarantool::space::{Metadata as SpaceMetadata, Space, SpaceType, SystemSpace};
use tarantool::transaction::{transaction, TransactionError};
use tarantool::{
    index::IteratorType,
    index::Metadata as IndexMetadata,
    index::{IndexId, Part},
    space::SpaceId,
    tlua::{self, LuaRead},
    tuple::Encode,
    util::Value,
};

use sbroad::ir::value::Value as IrValue;

use serde::{Deserialize, Serialize};

use crate::cas::{self, compare_and_swap};
use crate::storage;
use crate::storage::{Clusterwide, SPACE_ID_INTERNAL_MAX};
use crate::storage::{ClusterwideTable, PropertyName};
use crate::traft::error::Error;
use crate::traft::op::{Ddl, Op};
use crate::traft::{self, node, RaftIndex};
use crate::util::effective_user_id;

/// The initial local schema version. Immediately after the cluster is bootted
/// it has this schema version.
///
/// If a schema definition is marked with this version, it means the schema
/// defintion is builtin and is applied by default on all instances.
pub const INITIAL_SCHEMA_VERSION: u64 = 0;

////////////////////////////////////////////////////////////////////////////////
// TableDef
////////////////////////////////////////////////////////////////////////////////

/// Database table definition.
///
/// Describes a user-defined table.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct TableDef {
    pub id: SpaceId,
    pub name: String,
    pub distribution: Distribution,
    pub format: Vec<tarantool::space::Field>,
    pub schema_version: u64,
    pub operable: bool,
    pub engine: SpaceEngineType,
    pub owner: UserId,
}

impl Encode for TableDef {}

impl TableDef {
    /// Index (0-based) of field "operable" in the _pico_table table format.
    pub const FIELD_OPERABLE: usize = 5;

    /// Format of the _pico_table global table.
    #[inline(always)]
    pub fn format() -> Vec<tarantool::space::Field> {
        use tarantool::space::Field;
        vec![
            Field::from(("id", FieldType::Unsigned)),
            Field::from(("name", FieldType::String)),
            Field::from(("distribution", FieldType::Array)),
            Field::from(("format", FieldType::Array)),
            Field::from(("schema_version", FieldType::Unsigned)),
            Field::from(("operable", FieldType::Boolean)),
            Field::from(("engine", FieldType::String)),
            Field::from(("owner", FieldType::Unsigned)),
        ]
    }

    /// A dummy instance of the type for use in tests.
    #[inline(always)]
    pub fn for_tests() -> Self {
        Self {
            id: 10569,
            name: "stuff".into(),
            distribution: Distribution::Global,
            format: vec![],
            schema_version: 420,
            operable: true,
            engine: SpaceEngineType::Blackhole,
            owner: 42,
        }
    }

    pub fn to_space_metadata(&self) -> traft::Result<SpaceMetadata> {
        let format = fields_to_format(&self.format);

        let mut flags = BTreeMap::new();
        if matches!(self.distribution, Distribution::Global) {
            flags.insert("group_id".into(), 1.into());
        }

        let space_def = SpaceMetadata {
            id: self.id,
            user_id: self.owner,
            name: self.name.as_str().into(),
            engine: self.engine,
            field_count: 0,
            flags,
            format,
        };

        Ok(space_def)
    }
}

/// Definitions of builtin tables & their respective indexes.
/// These should be inserted into "_pico_table" & "_pico_index" at cluster bootstrap.
pub fn system_table_definitions() -> Vec<(TableDef, Vec<IndexDef>)> {
    let mut result = Vec::with_capacity(ClusterwideTable::all_tables().len());
    for sys_table in ClusterwideTable::all_tables() {
        let table_def = TableDef {
            id: sys_table.id(),
            name: sys_table.name().into(),
            distribution: Distribution::Global,
            format: sys_table.format(),
            // This means the local schema is already up to date and main loop doesn't need to do anything
            schema_version: INITIAL_SCHEMA_VERSION,
            operable: true,
            engine: SpaceEngineType::Memtx,
            owner: ADMIN_ID,
        };

        let index_defs = sys_table.index_definitions();
        result.push((table_def, index_defs));
    }

    // TODO: there's also "_raft_log" & "_raft_state" spaces, but we don't treat
    // them the same as others for some reason?

    result
}

// FIXME: move this to tarantool-module
pub fn fields_to_format(
    fields: &[tarantool::space::Field],
) -> Vec<BTreeMap<Cow<'static, str>, Value<'_>>> {
    let mut result = Vec::with_capacity(fields.len());
    for field in fields {
        let mut field_map = BTreeMap::new();
        field_map.insert("name".into(), Value::Str(field.name.as_str().into()));
        field_map.insert("type".into(), Value::Str(field.field_type.as_str().into()));
        field_map.insert("is_nullable".into(), Value::Bool(field.is_nullable));
        result.push(field_map);
    }
    result
}

////////////////////////////////////////////////////////////////////////////////
// Distribution
////////////////////////////////////////////////////////////////////////////////

/// Defines how to distribute tuples in a table across replicasets.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, LuaRead)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "kind")]
pub enum Distribution {
    /// Tuples will be replicated to each instance.
    Global,
    /// Tuples will be implicitely sharded. E.g. sent to the corresponding bucket
    /// which will be determined by a hash of the provided `sharding_key`.
    ShardedImplicitly {
        sharding_key: Vec<String>,
        #[serde(default)]
        sharding_fn: ShardingFn,
    },
    /// Tuples will be explicitely sharded. E.g. sent to the bucket
    /// which id is provided by field that is specified here.
    ///
    /// Default field name: "bucket_id"
    ShardedByField {
        #[serde(default = "default_bucket_id_field")]
        field: String,
    },
}

fn default_bucket_id_field() -> String {
    "bucket_id".into()
}

::tarantool::define_str_enum! {
    /// Custom sharding functions are not yet supported.
    #[derive(Default)]
    pub enum ShardingFn {
        Crc32 = "crc32",
        #[default]
        Murmur3 = "murmur3",
        Xxhash = "xxhash",
        Md5 = "md5",
    }
}

////////////////////////////////////////////////////////////////////////////////
// IndexDef
////////////////////////////////////////////////////////////////////////////////

/// Database index definition.
///
/// Describes a user-defined index.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct IndexDef {
    pub table_id: SpaceId,
    pub id: IndexId,
    pub name: String,
    // TODO: document this field's meaning, for now it seems it's always `true`
    pub local: bool,
    pub parts: Vec<Part>,
    pub schema_version: u64,
    pub operable: bool,
    pub unique: bool,
}

impl Encode for IndexDef {}

impl IndexDef {
    /// Index (0-based) of field "operable" in _pico_index table format.
    pub const FIELD_OPERABLE: usize = 6;

    /// Format of the _pico_index global table.
    #[inline(always)]
    pub fn format() -> Vec<tarantool::space::Field> {
        use tarantool::space::Field;
        vec![
            Field::from(("table_id", FieldType::Unsigned)),
            Field::from(("id", FieldType::Unsigned)),
            Field::from(("name", FieldType::String)),
            Field::from(("local", FieldType::Boolean)),
            Field::from(("parts", FieldType::Array)),
            Field::from(("schema_version", FieldType::Unsigned)),
            Field::from(("operable", FieldType::Boolean)),
            Field::from(("unique", FieldType::Boolean)),
        ]
    }

    /// A dummy instance of the type for use in tests.
    #[inline(always)]
    pub fn for_tests() -> Self {
        Self {
            table_id: 10569,
            id: 1,
            name: "secondary".into(),
            local: true,
            parts: vec![],
            schema_version: 420,
            operable: true,
            unique: false,
        }
    }

    pub fn to_index_metadata(&self) -> IndexMetadata {
        use tarantool::index::IndexType;

        let mut opts = BTreeMap::new();
        opts.insert(Cow::from("unique"), Value::Bool(self.unique));
        let index_meta = IndexMetadata {
            space_id: self.table_id,
            index_id: self.id,
            name: self.name.as_str().into(),
            r#type: IndexType::Tree,
            opts,
            parts: self.parts.clone(),
        };

        index_meta
    }
}

////////////////////////////////////////////////////////////////////////////////
// UserDef
////////////////////////////////////////////////////////////////////////////////

/// User definition.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct UserDef {
    pub id: UserId,
    pub name: String,
    pub schema_version: u64,
    pub auth: AuthDef,
    pub owner: UserId,
}

impl Encode for UserDef {}

impl UserDef {
    /// Index of field "auth" in the space _pico_user format.
    ///
    /// Index of first field is 0.
    pub const FIELD_AUTH: usize = 3;

    /// Format of the _pico_user global table.
    #[inline(always)]
    pub fn format() -> Vec<tarantool::space::Field> {
        use tarantool::space::Field;
        vec![
            Field::from(("id", FieldType::Unsigned)),
            Field::from(("name", FieldType::String)),
            Field::from(("schema_version", FieldType::Unsigned)),
            Field::from(("auth", FieldType::Array)),
            Field::from(("owner", FieldType::Unsigned)),
        ]
    }

    /// A dummy instance of the type for use in tests.
    #[inline(always)]
    pub fn for_tests() -> Self {
        Self {
            id: 69,
            name: "david".into(),
            schema_version: 421,
            auth: AuthDef::new(tarantool::auth::AuthMethod::ChapSha1, "".into()),
            owner: 42,
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
// RoleDef
////////////////////////////////////////////////////////////////////////////////

/// Role definition.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct RoleDef {
    pub id: UserId,
    pub name: String,
    pub schema_version: u64,
    pub owner: UserId,
}

impl Encode for RoleDef {}

impl RoleDef {
    /// Format of the _pico_role global table.
    #[inline(always)]
    pub fn format() -> Vec<tarantool::space::Field> {
        use tarantool::space::Field;
        vec![
            Field::from(("id", FieldType::Unsigned)),
            Field::from(("name", FieldType::String)),
            Field::from(("schema_version", FieldType::Unsigned)),
            Field::from(("owner", FieldType::Unsigned)),
        ]
    }

    /// A dummy instance of the type for use in tests.
    #[inline(always)]
    pub fn for_tests() -> Self {
        Self {
            id: 13,
            name: "devops".into(),
            schema_version: 419,
            owner: 42,
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
// PrivilegeDef
////////////////////////////////////////////////////////////////////////////////

/// User id of the builtin user "guest".
///
/// Default "untrusted" user.
///
/// See also <https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/_user/#box-space-user>
pub const GUEST_ID: UserId = 0;

/// User id of the builtin user "admin".
///
/// Note: Admin user id is used when we need to elevate privileges
/// because current user doesnt have access to system spaces
///
/// See also <https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/_user/#box-space-user>
pub const ADMIN_ID: UserId = 1;

/// User id of the builtin role "public".
///
/// Pre-defined role, automatically granted to new users.
/// Granting some privilege to this role is equivalent granting the privilege to everybody.
///
/// See also <https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/_user/#box-space-user>
pub const PUBLIC_ID: UserId = 2;

/// User id of the builtin role "replication".
///
/// Role "replication" has the following grants:
/// - Read access to the "universe"
/// - Write access to the space "_cluster"
pub const ROLE_REPLICATION_ID: i64 = 3;

/// User id of the builtin role "super".
///
/// Users with this role have access to everything.
///
/// See also <https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/_user/#box-space-user>
pub const SUPER_ID: UserId = 31;

/// User id of the builtin user "pico_service".
///
/// A special user for internal communication between instances of picodata.
/// It is equivalent in it's privileges to "admin". The only difference is that
/// only the automated rpc calls are performed as "pico_service".
pub const PICO_SERVICE_ID: UserId = 32;

/// Object id of the special builtin object "universe".
///
/// Object "universe" is basically an alias to the "whole database".
/// Granting access to "universe" is equivalent to granting access
/// to all objects of all types for which the privilege makes sense.
pub const UNIVERSE_ID: i64 = 0;

tarantool::define_str_enum! {
    pub enum SchemaObjectType {
        Table = "table",
        Role = "role",
        Routine = "routine",
        User = "user",
        Universe = "universe",
    }
}

impl SchemaObjectType {
    pub fn as_tarantool(&self) -> &'static str {
        match self {
            SchemaObjectType::Table => "space",
            t => t.as_str(),
        }
    }
}

tarantool::define_str_enum! {
    /// Picodata privilege types. For correspondence with
    /// tarantool privilege types see [`PrivilegeType::as_tarantool`].
    ///
    /// For each variant it is described which SQL queries in picodata
    /// a user with this privilege can execute.
    pub enum PrivilegeType {
        /// Allows SQL queries: `SELECT`
        Read = "read",
        /// Allows SQL queries: `INSERT`, `UPDATE`, `DELETE`
        Write = "write",
        /// Allows SQL queries: `CALL`.
        /// Also can indicate that role is granted to a user
        /// if object type is role.
        Execute = "execute",
        /// Allows a user to log-in when connecting to picodata instance
        Login = "login",
        /// Allows SQL queries: `CREATE`
        Create = "create",
        /// Allows SQL queries: `DROP`
        Drop = "drop",
        /// Allows SQL queries: `ALTER`
        Alter = "alter",
    }
}

impl PrivilegeType {
    /// Converts picodata privilege to a string that can be passed to
    /// `box.schema.user.grant` as `privileges`
    pub fn as_tarantool(&self) -> &'static str {
        match self {
            PrivilegeType::Login => "usage",
            t => t.as_str(),
        }
    }
}

/// Privilege definition.
/// Note the differences between picodata privileges and vanilla tarantool ones.
/// 1) Super role in picodata is a placeholder. It exists because picodata reuses user
///    ids from vanilla and super occupies an id. So to avoid assigning some piciodata
///    user Id of the vanilla role super we keep it as a placeholder. Aside from that
///    vanilla super role has some privileges that cannot be represented in picodata,
///    namely privileges on universe. This hole opens possibility for unwanted edge cases.
/// 2) Only Session and Usage can be granted on Universe.
/// Note that validation is not performed in Deserialize. We assume that for untrusted data
/// the object is created via constructor. In other cases validation was already performed
/// prior to serialization thus deserialization always creates a valid object.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct PrivilegeDef {
    privilege: PrivilegeType,
    object_type: SchemaObjectType,
    /// `-1` denotes an absense of a target object.
    /// Other values should be >= 0 and denote an existing target object.
    /// When working with `object_type` `universe` it might seem that it does
    /// not have a target object and `object_id` should be `-1`, this is incorrect
    /// universe has a target object with `object_id == 0`.
    ///
    /// To get the value of this field as `Option<u32>` see [`Self::object_id`]
    object_id: i64,
    /// Id of the user or role to whom the privilege is granted.
    ///
    /// In tarantool users and roles are stored in the same space, which means a
    /// role and a user cannot have the same id or name.
    grantee_id: UserId,
    grantor_id: UserId,
    schema_version: u64,
}

impl Encode for PrivilegeDef {}

#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub struct InvalidPrivilegeError {
    object_type: SchemaObjectType,
    unsupported: PrivilegeType,
    expected_one_of: &'static [PrivilegeType],
}

impl Display for InvalidPrivilegeError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_fmt(format_args!(
            "Unsupported {} privilege '{}', expected one of {:?}",
            self.object_type, self.unsupported, self.expected_one_of,
        ))
    }
}

impl PrivilegeDef {
    pub fn new(
        privilege: PrivilegeType,
        object_type: SchemaObjectType,
        object_id: i64,
        grantee_id: UserId,
        grantor_id: UserId,
        schema_version: u64,
    ) -> Result<PrivilegeDef, InvalidPrivilegeError> {
        let privilege_def = PrivilegeDef {
            privilege,
            object_type,
            object_id,
            grantee_id,
            grantor_id,
            schema_version,
        };

        use PrivilegeType::*;

        let valid_privileges: &[PrivilegeType] = match privilege_def.object_type {
            SchemaObjectType::Table => match privilege_def.object_id() {
                Some(_) => &[Read, Write, Alter, Drop],
                None => &[Read, Write, Create, Alter, Drop],
            },
            SchemaObjectType::Role => match privilege_def.object_id() {
                Some(_) => &[Execute, Drop],
                None => &[Create, Drop],
            },
            SchemaObjectType::User => match privilege_def.object_id() {
                Some(_) => &[Alter, Drop],
                None => &[Create, Alter, Drop],
            },
            SchemaObjectType::Universe => &[Login],
            SchemaObjectType::Routine => match privilege_def.object_id() {
                Some(_) => &[Execute, Drop],
                None => &[Create, Drop],
            },
        };

        if !valid_privileges.contains(&privilege) {
            return Err(InvalidPrivilegeError {
                object_type,
                unsupported: privilege,
                expected_one_of: valid_privileges,
            });
        }

        Ok(privilege_def)
    }

    #[inline(always)]
    pub fn privilege(&self) -> PrivilegeType {
        self.privilege
    }

    #[inline(always)]
    pub fn object_type(&self) -> SchemaObjectType {
        self.object_type
    }

    /// Get `object_id` field interpreting `-1` as `None`.
    #[inline(always)]
    pub fn object_id(&self) -> Option<u32> {
        if self.object_id >= 0 {
            Some(self.object_id as _)
        } else {
            debug_assert_eq!(self.object_id, -1, "object_id should be >= -1");
            None
        }
    }

    #[inline(always)]
    pub fn object_id_raw(&self) -> i64 {
        self.object_id
    }

    #[inline(always)]
    pub fn grantee_id(&self) -> UserId {
        self.grantee_id
    }

    #[inline(always)]
    pub fn grantor_id(&self) -> UserId {
        self.grantor_id
    }

    #[inline(always)]
    pub fn schema_version(&self) -> u64 {
        self.schema_version
    }

    /// Format of the _pico_privilege global table.
    #[inline(always)]
    pub fn format() -> Vec<tarantool::space::Field> {
        use tarantool::space::Field;
        vec![
            Field::from(("privilege", FieldType::String)),
            Field::from(("object_type", FieldType::String)),
            Field::from(("object_id", FieldType::Integer)),
            Field::from(("grantee_id", FieldType::Unsigned)),
            Field::from(("grantor_id", FieldType::Unsigned)),
            Field::from(("schema_version", FieldType::Unsigned)),
        ]
    }

    /// A dummy instance of the type for use in tests.
    #[inline(always)]
    pub fn for_tests() -> Self {
        Self {
            grantor_id: 13,
            grantee_id: 37,
            object_type: SchemaObjectType::User,
            object_id: -1,
            privilege: PrivilegeType::Create,
            schema_version: 337,
        }
    }

    /// Retrieves object_name from system spaces based on `object_id` and `object_type`.
    /// Returns `Ok(None)` in the case when the privilege has no target object (e.g. `object_id == -1`)
    /// or when target object is universe.
    /// Returns `Err` in case when target object was not found in system tables.
    ///
    /// # Panics
    /// 1. On storage failure
    pub fn resolve_object_name(&self, storage: &Clusterwide) -> Result<Option<String>, Error> {
        let Some(id) = self.object_id() else {
            return Ok(None);
        };
        let name = match self.object_type {
            SchemaObjectType::Table => storage.tables.get(id).map(|t| t.map(|t| t.name)),
            SchemaObjectType::Role => storage.roles.by_id(id).map(|t| t.map(|t| t.name)),
            SchemaObjectType::User => storage.users.by_id(id).map(|t| t.map(|t| t.name)),
            SchemaObjectType::Universe => {
                debug_assert_eq!(self.object_id, 0);
                return Ok(None);
            }
            SchemaObjectType::Routine => storage.routines.by_id(id).map(|t| t.map(|t| t.name)),
        }
        .expect("storage should not fail")
        .ok_or_else(|| Error::other(format!("object with id {id} should exist")))?;
        Ok(Some(name))
    }
}

////////////////////////////////////////////////////////////////////////////////
// system_user_definitions
////////////////////////////////////////////////////////////////////////////////

/// Definitions of builtin users & their respective privileges.
/// These should be inserted into "_pico_user" & "_pico_privilege" at cluster bootstrap.
pub fn system_user_definitions() -> Vec<(UserDef, Vec<PrivilegeDef>)> {
    let mut result = vec![];

    let initiator = ADMIN_ID;

    //
    // User "guest"
    //
    // equivalent SQL expression: CREATE USER 'guest' WITH PASSWORD '' USING chap-sha1
    {
        let user_def = UserDef {
            id: GUEST_ID,
            name: "guest".into(),
            // This means the local schema is already up to date and main loop doesn't need to do anything
            schema_version: INITIAL_SCHEMA_VERSION,
            auth: AuthDef::new(
                AuthMethod::ChapSha1,
                AuthData::new(&AuthMethod::ChapSha1, "guest", "").into_string(),
            ),
            owner: initiator,
        };
        let priv_defs = vec![
            PrivilegeDef {
                grantee_id: user_def.id,
                privilege: PrivilegeType::Login,
                object_type: SchemaObjectType::Universe,
                object_id: UNIVERSE_ID,
                // This means the local schema is already up to date and main loop doesn't need to do anything
                schema_version: INITIAL_SCHEMA_VERSION,
                grantor_id: initiator,
            },
            PrivilegeDef {
                grantee_id: user_def.id,
                privilege: PrivilegeType::Execute,
                object_type: SchemaObjectType::Role,
                object_id: PUBLIC_ID as _,
                // This means the local schema is already up to date and main loop doesn't need to do anything
                schema_version: INITIAL_SCHEMA_VERSION,
                grantor_id: initiator,
            },
        ];
        result.push((user_def, priv_defs));
    }

    //
    // User "admin"
    //
    {
        let user_def = UserDef {
            id: ADMIN_ID,
            name: "admin".into(),
            // This means the local schema is already up to date and main loop doesn't need to do anything
            schema_version: INITIAL_SCHEMA_VERSION,
            // This place slightly differs from the tarantool
            // implementation. In vanilla tarantool the auth_def is an empty
            // MP_MAP. Here for simplicity given available module api we
            // use ChapSha with invalid password (its impossible to get
            // empty string as output of sha1)
            auth: AuthDef::new(AuthMethod::ChapSha1, "".into()),
            owner: initiator,
        };
        let mut priv_defs = Vec::with_capacity(PrivilegeType::VARIANTS.len());
        // Grant all privileges on "universe" to "admin".
        for &privilege in PrivilegeType::VARIANTS {
            priv_defs.push(PrivilegeDef {
                grantee_id: user_def.id,
                privilege,
                object_type: SchemaObjectType::Universe,
                object_id: UNIVERSE_ID,
                // This means the local schema is already up to date and main loop doesn't need to do anything
                schema_version: INITIAL_SCHEMA_VERSION,
                grantor_id: initiator,
            });
        }
        result.push((user_def, priv_defs));
    }

    {
        let user_def = UserDef {
            id: PICO_SERVICE_ID,
            name: PICO_SERVICE_USER_NAME.into(),
            // This means the local schema is already up to date and main loop doesn't need to do anything
            schema_version: INITIAL_SCHEMA_VERSION,
            auth: AuthDef::new(
                AuthMethod::ChapSha1,
                tarantool::auth::AuthData::new(&AuthMethod::ChapSha1, PICO_SERVICE_USER_NAME, "")
                    .into_string(),
            ),
            owner: initiator,
        };
        let mut priv_defs = Vec::with_capacity(PrivilegeType::VARIANTS.len() + 1);
        // Grant all privileges on "universe" to "pico_service".
        for &privilege in PrivilegeType::VARIANTS {
            priv_defs.push(PrivilegeDef {
                grantee_id: user_def.id,
                privilege,
                object_type: SchemaObjectType::Universe,
                object_id: UNIVERSE_ID,
                // This means the local schema is already up to date and main loop doesn't need to do anything
                schema_version: INITIAL_SCHEMA_VERSION,
                grantor_id: initiator,
            });
        }
        // Role "replication" is needed explicitly
        priv_defs.push(PrivilegeDef {
            grantee_id: user_def.id,
            privilege: PrivilegeType::Execute,
            object_type: SchemaObjectType::Role,
            object_id: ROLE_REPLICATION_ID,
            // This means the local schema is already up to date and main loop doesn't need to do anything
            schema_version: INITIAL_SCHEMA_VERSION,
            grantor_id: initiator,
        });
        result.push((user_def, priv_defs));
    }

    result
}

/// Definitions of builtin roles & their respective privileges.
/// These should be inserted into "_pico_role" & "_pico_privilege" at cluster bootstrap.
// TODO: maybe this "_pico_role" should be merged with "_pico_user"
pub fn system_role_definitions() -> Vec<(RoleDef, Vec<PrivilegeDef>)> {
    let mut result = vec![];

    let public_def = RoleDef {
        id: PUBLIC_ID,
        name: "public".into(),
        // This means the local schema is already up to date and main loop doesn't need to do anything
        schema_version: INITIAL_SCHEMA_VERSION,
        owner: ADMIN_ID,
    };
    let public_privs = vec![
        // TODO:
        // - we grant "execute" access to a bunch of stored procs
        // - vanilla tarantool grants "read" access to a bunch of system spaces
    ];
    result.push((public_def, public_privs));

    let super_def = RoleDef {
        id: SUPER_ID,
        name: "super".into(),
        // This means the local schema is already up to date and main loop doesn't need to do anything
        schema_version: INITIAL_SCHEMA_VERSION,
        owner: ADMIN_ID,
    };
    let super_privs = vec![
        // Special role, it's privileges are implicit
    ];
    result.push((super_def, super_privs));

    // There's also a "replication" builtin role, but we don't care about it?

    result
}

////////////////////////////////////////////////////////////////////////////////
// init_pico_service
////////////////////////////////////////////////////////////////////////////////

/// Name of the special builtin user for internal communication between
/// instances. It's id is [`PICO_SERVICE_ID`].
///
/// Use this constant instead of literal "pico_service" so that it's easier to
/// find all the places where we refer to "pico_service".
pub const PICO_SERVICE_USER_NAME: &'static str = "pico_service";

pub fn init_user_pico_service() {
    let sys_user = SystemSpace::User.as_space();
    let sys_priv = SystemSpace::Priv.as_space();

    let t = sys_user
        .get(&[PICO_SERVICE_ID])
        .expect("reading from _user shouldn't fail");
    if t.is_some() {
        // Already exists (instance restarted)
        return;
    }

    let found = system_user_definitions()
        .into_iter()
        .find(|(user_def, _)| user_def.id == PICO_SERVICE_ID);
    let Some((user_def, _)) = &found else {
        panic!("Couldn't find definition for '{PICO_SERVICE_USER_NAME}' system user");
    };

    let res = storage::acl::on_master_create_user(user_def, false);
    if let Err(e) = res {
        panic!("failed creating user '{PICO_SERVICE_USER_NAME}': {e}");
    }

    // Grant ALL privileges to "every object of every type".
    const PRIVILEGE_ALL: u32 = 0xffff_ffff;
    let res = sys_priv.insert(&(
        ADMIN_ID,
        PICO_SERVICE_ID,
        "universe",
        UNIVERSE_ID,
        PRIVILEGE_ALL,
    ));
    if let Err(e) = res {
        panic!("failed creating user '{PICO_SERVICE_USER_NAME}': {e}");
    }

    // Also grant role "replication", because all privileges to "every object of
    // every type" is not enough.
    const PRIVILEGE_EXECUTE: u32 = 4;
    let res = sys_priv.insert(&(
        ADMIN_ID,
        PICO_SERVICE_ID,
        "role",
        ROLE_REPLICATION_ID,
        PRIVILEGE_EXECUTE,
    ));
    if let Err(e) = res {
        panic!("failed creating user '{PICO_SERVICE_USER_NAME}': {e}");
    }
}

////////////////////////////////////////////////////////////////////////////////
// RoutineDef
////////////////////////////////////////////////////////////////////////////////

/// Routine kind.
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum RoutineKind {
    #[default]
    Procedure,
}

impl Display for RoutineKind {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            RoutineKind::Procedure => write!(f, "procedure"),
        }
    }
}

/// Parameter mode.
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum RoutineParamMode {
    #[default]
    In,
}

impl Display for RoutineParamMode {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            RoutineParamMode::In => write!(f, "in"),
        }
    }
}

/// Routine parameter definition.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct RoutineParamDef {
    #[serde(default)]
    pub mode: RoutineParamMode,
    pub r#type: FieldType,
    #[serde(default)]
    pub default: Option<IrValue>,
}

impl Display for RoutineParamDef {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "mode: {}, type: {}", self.mode, self.r#type)?;
        if let Some(default) = &self.default {
            write!(f, ", default: {}", default)?;
        }
        Ok(())
    }
}

impl Default for RoutineParamDef {
    fn default() -> Self {
        Self {
            mode: RoutineParamMode::default(),
            r#type: FieldType::Scalar,
            default: None,
        }
    }
}

impl RoutineParamDef {
    pub fn with_type(self, r#type: FieldType) -> Self {
        Self { r#type, ..self }
    }
}

pub type RoutineParams = Vec<RoutineParamDef>;

pub type RoutineReturns = Vec<IrValue>;

#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum RoutineLanguage {
    #[default]
    SQL,
}

impl From<Language> for RoutineLanguage {
    fn from(language: Language) -> Self {
        match language {
            Language::SQL => Self::SQL,
        }
    }
}

#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum RoutineSecurity {
    #[default]
    Invoker,
}

/// Routine definition.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct RoutineDef {
    pub id: u32,
    pub name: String,
    pub kind: RoutineKind,
    pub params: RoutineParams,
    pub returns: RoutineReturns,
    pub language: RoutineLanguage,
    pub body: String,
    pub security: RoutineSecurity,
    pub operable: bool,
    pub schema_version: u64,
    pub owner: UserId,
}

impl Encode for RoutineDef {}

impl RoutineDef {
    /// Index (0-based) of field "operable" in _pico_routine table format.
    pub const FIELD_OPERABLE: usize = 8;

    /// Format of the _pico_routine global table.
    #[inline(always)]
    pub fn format() -> Vec<tarantool::space::Field> {
        use tarantool::space::Field;
        vec![
            Field::from(("id", FieldType::Unsigned)),
            Field::from(("name", FieldType::String)),
            Field::from(("kind", FieldType::String)),
            Field::from(("params", FieldType::Array)),
            Field::from(("returns", FieldType::Array)),
            Field::from(("language", FieldType::String)),
            Field::from(("body", FieldType::String)),
            Field::from(("security", FieldType::String)),
            Field::from(("operable", FieldType::Boolean)),
            Field::from(("schema_version", FieldType::Unsigned)),
            Field::from(("owner", FieldType::Unsigned)),
        ]
    }

    /// A dummy instance of the type for use in tests.
    #[inline(always)]
    pub fn for_tests() -> Self {
        Self {
            id: 16005,
            name: "proc".into(),
            kind: RoutineKind::Procedure,
            params: vec![
                RoutineParamDef {
                    mode: RoutineParamMode::In,
                    r#type: FieldType::String,
                    default: Some(IrValue::String("hello".into())),
                },
                RoutineParamDef {
                    mode: RoutineParamMode::In,
                    r#type: FieldType::Unsigned,
                    default: None,
                },
            ],
            returns: vec![],
            language: RoutineLanguage::SQL,
            body: "values (?), (?)".into(),
            security: RoutineSecurity::Invoker,
            operable: true,
            schema_version: 421,
            owner: 42,
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
// ...
////////////////////////////////////////////////////////////////////////////////

// TODO: this should be a TryFrom in tarantool-module
pub fn try_space_field_type_to_index_field_type(
    ft: tarantool::space::FieldType,
) -> Option<tarantool::index::FieldType> {
    use tarantool::index::FieldType as IFT;
    use tarantool::space::FieldType as SFT;
    let res = match ft {
        SFT::Any => None,
        SFT::Unsigned => Some(IFT::Unsigned),
        SFT::String => Some(IFT::String),
        SFT::Number => Some(IFT::Number),
        SFT::Double => Some(IFT::Double),
        SFT::Integer => Some(IFT::Integer),
        SFT::Boolean => Some(IFT::Boolean),
        SFT::Varbinary => Some(IFT::Varbinary),
        SFT::Scalar => Some(IFT::Scalar),
        SFT::Decimal => Some(IFT::Decimal),
        SFT::Uuid => Some(IFT::Uuid),
        SFT::Datetime => Some(IFT::Datetime),
        SFT::Interval => None,
        SFT::Array => Some(IFT::Array),
        SFT::Map => None,
    };
    res
}

#[derive(Debug, thiserror::Error)]
pub enum DdlError {
    #[error("{0}")]
    CreateTable(#[from] CreateTableError),
    #[error("ddl operation was aborted")]
    Aborted,
    #[error("there is no pending ddl operation")]
    NoPendingDdl,
    #[error("{0}")]
    CreateRoutine(#[from] CreateRoutineError),
}

#[derive(Debug, thiserror::Error)]
pub enum CreateTableError {
    #[error("space with id {id} exists with a different name '{actual_name}', but expected '{expected_name}'")]
    ExistsWithDifferentName {
        id: SpaceId,
        expected_name: String,
        actual_name: String,
    },
    #[error("several fields have the same name: {0}")]
    DuplicateFieldName(String),
    #[error("no field with name: {0}")]
    FieldUndefined(String),
    #[error("distribution is `sharded`, but neither `by_field` nor `sharding_key` is set")]
    ShardingPolicyUndefined,
    #[error("only one of sharding policy fields (`by_field`, `sharding_key`) should be set")]
    ConflictingShardingPolicy,
    #[error("global spaces only support memtx engine")]
    IncompatibleGlobalSpaceEngine,
}

impl From<CreateTableError> for Error {
    fn from(err: CreateTableError) -> Self {
        DdlError::CreateTable(err).into()
    }
}

#[derive(Debug, thiserror::Error)]
pub enum CreateRoutineError {
    #[error("routine {name} already exists with a different kind")]
    ExistsWithDifferentKind { name: String },
    #[error("routine {name} already exists with different parameters")]
    ExistsWithDifferentParams { name: String },
    #[error("routine {name} already exists with a different language")]
    ExistsWithDifferentLanguage { name: String },
    #[error("routine {name} already exists with a different body")]
    ExistsWithDifferentBody { name: String },
    #[error("routine {name} already exists with a different security")]
    ExistsWithDifferentSecurity { name: String },
    #[error("routine {name} already exists with a different owner")]
    ExistsWithDifferentOwner { name: String },
}

impl From<CreateRoutineError> for Error {
    fn from(err: CreateRoutineError) -> Self {
        DdlError::CreateRoutine(err).into()
    }
}

// TODO: Add `LuaRead` to tarantool::space::Field and use it
#[derive(Clone, Debug, LuaRead)]
pub struct Field {
    pub name: String, // TODO(gmoshkin): &str
    pub r#type: FieldType,
    pub is_nullable: bool,
}

impl From<Field> for tarantool::space::Field {
    fn from(field: Field) -> Self {
        tarantool::space::Field {
            name: field.name,
            field_type: field.r#type,
            is_nullable: field.is_nullable,
        }
    }
}

::tarantool::define_str_enum! {
    #[derive(Default)]
    pub enum DistributionParam {
        #[default]
        Global = "global",
        Sharded = "sharded",
    }
}

#[derive(Clone, Debug)]
pub struct CreateProcParams {
    pub name: String,
    pub params: RoutineParams,
    pub language: RoutineLanguage,
    pub body: String,
    pub security: RoutineSecurity,
    pub owner: UserId,
}

impl CreateProcParams {
    pub fn func_exists(&self) -> bool {
        let func_space = Space::from(SystemSpace::Func);

        let name_idx = func_space
            .index_cached("name")
            .expect("_function should have an index by name");
        let t = name_idx
            .get(&[&self.name])
            .expect("reading from _function shouldn't fail");
        t.is_some()
    }

    pub fn validate(&self, storage: &Clusterwide) -> traft::Result<()> {
        let routine = storage.routines.by_name(&self.name)?;
        if let Some(def) = routine {
            if def.kind != RoutineKind::Procedure {
                return Err(CreateRoutineError::ExistsWithDifferentKind { name: def.name })?;
            }
            if def.params != self.params {
                return Err(CreateRoutineError::ExistsWithDifferentParams { name: def.name })?;
            }
            if def.language != self.language {
                return Err(CreateRoutineError::ExistsWithDifferentLanguage { name: def.name })?;
            }
            if def.body != self.body {
                return Err(CreateRoutineError::ExistsWithDifferentBody { name: def.name })?;
            }
            if def.security != self.security {
                return Err(CreateRoutineError::ExistsWithDifferentSecurity { name: def.name })?;
            }
            if def.owner != self.owner {
                return Err(CreateRoutineError::ExistsWithDifferentOwner { name: def.name })?;
            }
        }
        Ok(())
    }
}

#[derive(Clone, Debug, LuaRead)]
pub struct CreateTableParams {
    pub(crate) id: Option<SpaceId>,
    pub(crate) name: String,
    pub(crate) format: Vec<Field>,
    pub(crate) primary_key: Vec<String>,
    pub(crate) distribution: DistributionParam,
    pub(crate) by_field: Option<String>,
    pub(crate) sharding_key: Option<Vec<String>>,
    pub(crate) sharding_fn: Option<ShardingFn>,
    pub(crate) engine: Option<SpaceEngineType>,
    pub(crate) owner: UserId,
    /// Timeout in seconds.
    ///
    /// Specifying the timeout identifies how long user is ready to wait for ddl to be applied.
    /// But it does not provide guarantees that a ddl will be aborted if wait for commit timeouts.
    pub timeout: Option<f64>,
}

impl CreateTableParams {
    /// Checks if space described by options already exists. Returns an error if
    /// the space with given id exists, but has a different name.
    pub fn space_exists(&self) -> traft::Result<bool> {
        // The check is performed using `box.space` API, so that local spaces are counted too.
        let sys_space = Space::from(SystemSpace::Space);

        let Some(id) = self.id else {
            let sys_space_by_name = sys_space
                .index_cached("name")
                .expect("_space should have an index by name");
            let t = sys_space_by_name
                .get(&[&self.name])
                .expect("reading from _space shouldn't fail");
            return Ok(t.is_some());
        };

        let t = sys_space
            .get(&[id])
            .expect("reading from _space shouldn't fail");
        let Some(t) = t else {
            return Ok(false);
        };

        let existing_name: &str = t.get("name").expect("space metadata should contain a name");
        if existing_name == self.name {
            return Ok(true);
        } else {
            // TODO: check everything else is the same
            // https://git.picodata.io/picodata/picodata/picodata/-/issues/331
            return Err(CreateTableError::ExistsWithDifferentName {
                id,
                expected_name: self.name.clone(),
                actual_name: existing_name.into(),
            }
            .into());
        }
    }

    pub fn validate(&self) -> traft::Result<()> {
        // Check space id fits in the allowed range
        if let Some(id) = self.id {
            if id <= SPACE_ID_INTERNAL_MAX {
                crate::tlog!(Warning, "requested space id {id} is in the range 0..={SPACE_ID_INTERNAL_MAX} reserved for future use by picodata, you may have a conflict in a future version");
            }
        }
        // All field names are unique
        let mut field_names = HashSet::new();
        for field in &self.format {
            if !field_names.insert(field.name.as_str()) {
                return Err(CreateTableError::DuplicateFieldName(field.name.clone()).into());
            }
        }
        // All primary key components exist in fields
        for part in &self.primary_key {
            if !field_names.contains(part.as_str()) {
                return Err(CreateTableError::FieldUndefined(part.clone()).into());
            }
        }
        // Global spaces must have memtx engine
        if self.distribution == DistributionParam::Global
            && self.engine.is_some_and(|e| e != SpaceEngineType::Memtx)
        {
            return Err(CreateTableError::IncompatibleGlobalSpaceEngine.into());
        }
        // All sharding key components exist in fields
        if self.distribution == DistributionParam::Sharded {
            match (&self.by_field, &self.sharding_key) {
                (Some(by_field), None) => {
                    if !field_names.contains(by_field.as_str()) {
                        return Err(CreateTableError::FieldUndefined(by_field.clone()).into());
                    }
                    if self.sharding_fn.is_some() {
                        crate::tlog!(
                            Warning,
                            "`sharding_fn` is specified but will be ignored, as sharding `by_field` is set"
                        );
                    }
                }
                (None, Some(sharding_key)) => {
                    let mut parts = HashSet::new();
                    for part in sharding_key {
                        if !field_names.contains(part.as_str()) {
                            return Err(CreateTableError::FieldUndefined(part.clone()).into());
                        }
                        // And all parts are unique
                        if !parts.insert(part.as_str()) {
                            return Err(CreateTableError::DuplicateFieldName(part.clone()).into());
                        }
                    }
                }
                (None, None) => return Err(CreateTableError::ShardingPolicyUndefined.into()),
                (Some(_), Some(_)) => {
                    return Err(CreateTableError::ConflictingShardingPolicy.into())
                }
            }
        } else {
            if self.by_field.is_some() {
                crate::tlog!(
                    Warning,
                    "`by_field` is specified but will be ignored, as `distribution` is `global`"
                );
            }
            if self.sharding_key.is_some() {
                crate::tlog!(
                    Warning,
                    "`sharding_key` is specified but will be ignored, as `distribution` is `global`"
                );
            }
            if self.sharding_fn.is_some() {
                crate::tlog!(
                    Warning,
                    "`sharding_fn` is specified but will be ignored, as `distribution` is `global`"
                );
            }
        }
        Ok(())
    }

    /// Create space and then rollback.
    ///
    /// Should be used for checking if a space with these params can be created.
    pub fn test_create_space(&self, storage: &Clusterwide) -> traft::Result<()> {
        let id = self.id.expect("space id should've been chosen by now");
        let user = storage
            .users
            .by_id(self.owner)?
            .ok_or_else(|| Error::Other(format!("user with id {} not found", self.owner).into()))?
            .name;
        let err = transaction(|| -> Result<(), Option<tarantool::error::Error>> {
            // TODO: allow create_space to accept user by id
            ::tarantool::schema::space::create_space(
                &self.name,
                &SpaceCreateOptions {
                    if_not_exists: false,
                    engine: self.engine.unwrap_or_default(),
                    id: Some(id),
                    field_count: self.format.len() as u32,
                    user: Some(user),
                    space_type: SpaceType::Normal,
                    format: Some(
                        self.format
                            .iter()
                            .cloned()
                            .map(tarantool::space::Field::from)
                            .collect(),
                    ),
                },
            )
            .map_err(Some)?;
            // Rollback space creation
            Err(None)
        })
        .unwrap_err();
        match err {
            // Space was successfully created and rolled back
            TransactionError::RolledBack(None) => Ok(()),
            // Space creation failed
            TransactionError::RolledBack(Some(err)) => Err(err.into()),
            // Error during commit or rollback
            err => panic!("transaction mechanism should not fail: {err:?}"),
        }
    }

    /// Chooses an id for the new space if it's not set yet and sets `self.id`.
    pub fn choose_id_if_not_specified(&mut self) -> traft::Result<()> {
        let sys_space = Space::from(SystemSpace::Space);

        let id = if let Some(id) = self.id {
            id
        } else {
            let id_range_min = SPACE_ID_INTERNAL_MAX + 1;
            let id_range_max = SPACE_ID_TEMPORARY_MIN;

            let mut iter = sys_space.select(IteratorType::LT, &[id_range_max])?;
            let tuple = iter.next().expect("there's always at least system spaces");
            let mut max_id: SpaceId = tuple
                .field(0)
                .expect("space metadata should decode fine")
                .expect("space id should always be present");

            let find_next_unused_id = |start: SpaceId| -> Result<SpaceId, Error> {
                let iter = sys_space.select(IteratorType::GE, &[start])?;
                let mut next_id = start;
                for tuple in iter {
                    let id: SpaceId = tuple
                        .field(0)
                        .expect("space metadata should decode fine")
                        .expect("space id should always be present");
                    if id != next_id {
                        // Found a hole in the id range.
                        return Ok(next_id);
                    }
                    next_id += 1;
                }
                Ok(next_id)
            };

            if max_id < id_range_min {
                max_id = id_range_min;
            }

            let mut id = find_next_unused_id(max_id)?;
            if id >= id_range_max {
                id = find_next_unused_id(id_range_min)?;
                if id >= id_range_max {
                    set_error!(TarantoolErrorCode::CreateSpace, "space id limit is reached");
                    return Err(TarantoolError::last().into());
                }
            }
            id
        };
        self.id = Some(id);
        Ok(())
    }

    pub fn into_ddl(self) -> traft::Result<Ddl> {
        let id = self.id.expect("space id should've been chosen by now");
        let primary_key: Vec<_> = self.primary_key.into_iter().map(Part::field).collect();
        let format: Vec<_> = self
            .format
            .into_iter()
            .map(tarantool::space::Field::from)
            .collect();
        let distribution = match self.distribution {
            DistributionParam::Global => Distribution::Global,
            DistributionParam::Sharded => {
                if let Some(field) = self.by_field {
                    Distribution::ShardedByField { field }
                } else {
                    Distribution::ShardedImplicitly {
                        sharding_key: self
                            .sharding_key
                            .expect("should be checked during `validate`"),
                        sharding_fn: self.sharding_fn.unwrap_or_default(),
                    }
                }
            }
        };
        let res = Ddl::CreateTable {
            id,
            name: self.name,
            format,
            primary_key,
            distribution,
            engine: self.engine.unwrap_or_default(),
            owner: self.owner,
        };
        Ok(res)
    }
}

/// Minimum id in the range of ids reserved for temporary spaces.
///
/// Temporary spaces need a special range of ids to avoid conflicts with
/// spaces defined on replicas. This value is defined in tarantool, see
/// <https://git.picodata.io/picodata/tarantool/-/blob/5c3c8ed32c7a9c84a0e86c8453269f0925ce63ed/src/box/schema_def.h#L67>
const SPACE_ID_TEMPORARY_MIN: SpaceId = 1 << 30;

/// Waits for a pending ddl to be either `Committed` or `Aborted` by the governor.
///
/// Returns an index of the corresponding `DdlCommit` entry.
///
/// If `timeout` is reached earlier returns an error.
pub fn wait_for_ddl_commit(
    prepare_commit: RaftIndex,
    timeout: Duration,
) -> traft::Result<RaftIndex> {
    let node = node::global()?;
    let raft_storage = &node.raft_storage;
    let deadline = fiber::clock().saturating_add(timeout);
    let last_seen = prepare_commit;
    loop {
        let cur_applied = node.get_index();
        let new_entries = raft_storage.entries(last_seen + 1, cur_applied + 1, None)?;
        for entry in new_entries {
            if entry.entry_type != raft::prelude::EntryType::EntryNormal {
                continue;
            }
            let index = entry.index;
            let op = entry.into_op().unwrap_or(Op::Nop);
            match op {
                Op::DdlCommit => return Ok(index),
                Op::DdlAbort => return Err(DdlError::Aborted.into()),
                _ => (),
            }
        }

        node.wait_index(cur_applied + 1, deadline.duration_since(fiber::clock()))?;
    }
}

/// Aborts a pending DDL operation and waits for abort to be committed localy.
/// If `timeout` is reached earlier returns an error.
///
/// Returns an index of the corresponding DdlAbort raft entry, or an error if
/// there is no pending DDL operation.
pub fn abort_ddl(timeout: Duration) -> traft::Result<RaftIndex> {
    let node = node::global()?;
    loop {
        if node.storage.properties.pending_schema_change()?.is_none() {
            return Err(DdlError::NoPendingDdl.into());
        }
        let index = node.get_index();
        let term = raft::Storage::term(&node.raft_storage, index)?;
        #[rustfmt::skip]
        let predicate = cas::Predicate {
            index,
            term,
            ranges: vec![
                cas::Range::new(ClusterwideTable::Property).eq([PropertyName::PendingSchemaChange]),
                cas::Range::new(ClusterwideTable::Property).eq([PropertyName::GlobalSchemaVersion]),
                cas::Range::new(ClusterwideTable::Property).eq([PropertyName::NextSchemaVersion]),
            ],
        };

        let (index, term) =
            compare_and_swap(Op::DdlAbort, predicate, effective_user_id(), timeout)?;
        node.wait_index(index, timeout)?;
        if raft::Storage::term(&node.raft_storage, index)? != term {
            // leader switched - retry
            continue;
        }
        return Ok(index);
    }
}

mod tests {
    use tarantool::{auth::AuthMethod, space::FieldType};

    use super::*;

    fn storage() -> Clusterwide {
        let storage = Clusterwide::for_tests();
        storage
            .users
            .insert(&UserDef {
                id: ADMIN_ID,
                name: String::from("admin"),
                schema_version: 0,
                auth: AuthDef::new(AuthMethod::ChapSha1, String::from("")),
                owner: ADMIN_ID,
            })
            .unwrap();
        storage
    }

    #[::tarantool::test]
    fn test_create_space() {
        let storage = storage();
        CreateTableParams {
            id: Some(1337),
            name: "friends_of_peppa".into(),
            format: vec![
                Field {
                    name: "id".into(),
                    r#type: FieldType::Number,
                    is_nullable: false,
                },
                Field {
                    name: "name".into(),
                    r#type: FieldType::String,
                    is_nullable: false,
                },
            ],
            primary_key: vec![],
            distribution: DistributionParam::Global,
            by_field: None,
            sharding_key: None,
            sharding_fn: None,
            engine: None,
            timeout: None,
            owner: ADMIN_ID,
        }
        .test_create_space(&storage)
        .unwrap();
        assert!(tarantool::space::Space::find("friends_of_peppa").is_none());

        CreateTableParams {
            id: Some(1337),
            name: "friends_of_peppa".into(),
            format: vec![],
            primary_key: vec![],
            distribution: DistributionParam::Sharded,
            by_field: None,
            sharding_key: None,
            sharding_fn: None,
            engine: Some(SpaceEngineType::Vinyl),
            timeout: None,
            owner: ADMIN_ID,
        }
        .test_create_space(&storage)
        .unwrap();
        assert!(tarantool::space::Space::find("friends_of_peppa").is_none());

        let err = CreateTableParams {
            id: Some(0),
            name: "friends_of_peppa".into(),
            format: vec![],
            primary_key: vec![],
            distribution: DistributionParam::Global,
            by_field: None,
            sharding_key: None,
            sharding_fn: None,
            engine: None,
            timeout: None,
            owner: ADMIN_ID,
        }
        .test_create_space(&storage)
        .unwrap_err();
        assert_eq!(
            err.to_string(),
            "tarantool error: CreateSpace: Failed to create space 'friends_of_peppa': space id 0 is reserved"
        );
    }

    #[::tarantool::test]
    fn ddl() {
        let new_space = "new_space";
        let new_id = 1;
        let field1 = Field {
            name: "field1".into(),
            r#type: FieldType::Any,
            is_nullable: false,
        };
        let field2 = Field {
            name: "field2".into(),
            r#type: FieldType::Any,
            is_nullable: false,
        };

        let err = CreateTableParams {
            id: Some(new_id),
            name: new_space.into(),
            format: vec![field1.clone(), field1.clone()],
            primary_key: vec![],
            distribution: DistributionParam::Global,
            by_field: None,
            sharding_key: None,
            sharding_fn: None,
            engine: None,
            timeout: None,
            owner: ADMIN_ID,
        }
        .validate()
        .unwrap_err();
        assert_eq!(err.to_string(), "several fields have the same name: field1");

        let err = CreateTableParams {
            id: Some(new_id),
            name: new_space.into(),
            format: vec![field1.clone()],
            primary_key: vec![field2.name.clone()],
            distribution: DistributionParam::Global,
            by_field: None,
            sharding_key: None,
            sharding_fn: None,
            engine: None,
            timeout: None,
            owner: ADMIN_ID,
        }
        .validate()
        .unwrap_err();
        assert_eq!(err.to_string(), "no field with name: field2");

        let err = CreateTableParams {
            id: Some(new_id),
            name: new_space.into(),
            format: vec![field1.clone()],
            primary_key: vec![],
            distribution: DistributionParam::Sharded,
            by_field: None,
            sharding_key: Some(vec![field2.name.clone()]),
            sharding_fn: None,
            engine: None,
            timeout: None,
            owner: ADMIN_ID,
        }
        .validate()
        .unwrap_err();
        assert_eq!(err.to_string(), "no field with name: field2");

        let err = CreateTableParams {
            id: Some(new_id),
            name: new_space.into(),
            format: vec![field1.clone()],
            primary_key: vec![],
            distribution: DistributionParam::Sharded,
            by_field: None,
            sharding_key: Some(vec![field1.name.clone(), field1.name.clone()]),
            sharding_fn: None,
            engine: None,
            timeout: None,
            owner: ADMIN_ID,
        }
        .validate()
        .unwrap_err();
        assert_eq!(err.to_string(), "several fields have the same name: field1");

        let err = CreateTableParams {
            id: Some(new_id),
            name: new_space.into(),
            format: vec![field1.clone()],
            primary_key: vec![],
            distribution: DistributionParam::Sharded,
            by_field: Some(field2.name.clone()),
            sharding_key: None,
            sharding_fn: None,
            engine: None,
            timeout: None,
            owner: ADMIN_ID,
        }
        .validate()
        .unwrap_err();
        assert_eq!(err.to_string(), "no field with name: field2");

        let err = CreateTableParams {
            id: Some(new_id),
            name: new_space.into(),
            format: vec![field1.clone()],
            primary_key: vec![],
            distribution: DistributionParam::Sharded,
            by_field: None,
            sharding_key: None,
            sharding_fn: None,
            engine: None,
            timeout: None,
            owner: ADMIN_ID,
        }
        .validate()
        .unwrap_err();
        assert_eq!(
            err.to_string(),
            "distribution is `sharded`, but neither `by_field` nor `sharding_key` is set"
        );

        let err = CreateTableParams {
            id: Some(new_id),
            name: new_space.into(),
            format: vec![field1.clone()],
            primary_key: vec![],
            distribution: DistributionParam::Sharded,
            by_field: Some(field2.name.clone()),
            sharding_key: Some(vec![]),
            sharding_fn: None,
            engine: None,
            timeout: None,
            owner: ADMIN_ID,
        }
        .validate()
        .unwrap_err();
        assert_eq!(
            err.to_string(),
            "only one of sharding policy fields (`by_field`, `sharding_key`) should be set"
        );

        CreateTableParams {
            id: Some(new_id),
            name: new_space.into(),
            format: vec![field1.clone(), field2.clone()],
            primary_key: vec![field2.name.clone()],
            distribution: DistributionParam::Sharded,
            by_field: Some(field2.name.clone()),
            sharding_key: None,
            sharding_fn: None,
            engine: None,
            timeout: None,
            owner: ADMIN_ID,
        }
        .validate()
        .unwrap();

        let err = CreateTableParams {
            id: Some(new_id),
            name: new_space.into(),
            format: vec![field1, field2.clone()],
            primary_key: vec![field2.name],
            distribution: DistributionParam::Global,
            by_field: None,
            sharding_key: None,
            sharding_fn: None,
            engine: Some(SpaceEngineType::Vinyl),
            timeout: None,
            owner: ADMIN_ID,
        }
        .validate()
        .unwrap_err();
        assert_eq!(err.to_string(), "global spaces only support memtx engine");
    }

    #[::tarantool::test]
    fn test_space_id_temporary_min() {
        let lua = tarantool::lua_state();
        let id: SpaceId = lua
            .eval("return box.schema.SPACE_ID_TEMPORARY_MIN")
            .unwrap();
        assert_eq!(id, SPACE_ID_TEMPORARY_MIN);
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use tarantool::tuple::ToTupleBuffer;

    #[test]
    #[rustfmt::skip]
    fn space_def_matches_format() {
        let i = TableDef::for_tests();
        let tuple_data = i.to_tuple_buffer().unwrap();
        let format = TableDef::format();
        crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "TableDef::format");

        assert_eq!(format[TableDef::FIELD_OPERABLE].name, "operable");
    }

    #[test]
    #[rustfmt::skip]
    fn index_def_matches_format() {
        let i = IndexDef::for_tests();
        let tuple_data = i.to_tuple_buffer().unwrap();
        let format = IndexDef::format();
        crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "IndexDef::format");

        assert_eq!(format[IndexDef::FIELD_OPERABLE].name, "operable");
    }

    #[test]
    #[rustfmt::skip]
    fn user_def_matches_format() {
        let i = UserDef::for_tests();
        let tuple_data = i.to_tuple_buffer().unwrap();
        let format = UserDef::format();
        crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "UserDef::format");

        assert_eq!(format[UserDef::FIELD_AUTH].name, "auth");
    }

    #[test]
    #[rustfmt::skip]
    fn role_def_matches_format() {
        let i = RoleDef::for_tests();
        let tuple_data = i.to_tuple_buffer().unwrap();
        let format = RoleDef::format();
        crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "RoleDef::format");
    }

    #[test]
    #[rustfmt::skip]
    fn privilege_def_matches_format() {
        let i = PrivilegeDef::for_tests();
        let tuple_data = i.to_tuple_buffer().unwrap();
        let format = PrivilegeDef::format();
        crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "PrivilegeDef::format");
    }

    #[track_caller]
    fn check_object_privilege(
        object_type: SchemaObjectType,
        valid_privileges: &'static [PrivilegeType],
        object_id: i64,
    ) {
        for privilege in PrivilegeType::VARIANTS {
            let res = PrivilegeDef::new(*privilege, object_type, object_id, 1, 1, 1);

            if valid_privileges.contains(privilege) {
                assert!(res.is_ok())
            } else {
                assert_eq!(
                    res.unwrap_err(),
                    InvalidPrivilegeError {
                        object_type,
                        unsupported: *privilege,
                        expected_one_of: valid_privileges,
                    }
                )
            }
        }
    }

    #[test]
    fn privilege_def_validation() {
        use PrivilegeType::*;

        // table
        let valid = &[Read, Write, Create, Alter, Drop];
        check_object_privilege(SchemaObjectType::Table, valid, -1);

        // particular table
        let valid = &[Read, Write, Alter, Drop];
        check_object_privilege(SchemaObjectType::Table, valid, 42);

        // user
        let valid = &[Create, Alter, Drop];
        check_object_privilege(SchemaObjectType::User, valid, -1);

        // particular user
        let valid = &[Alter, Drop];
        check_object_privilege(SchemaObjectType::User, valid, 42);

        // role
        let valid = &[Create, Drop];
        check_object_privilege(SchemaObjectType::Role, valid, -1);

        // particular role
        let valid = &[Execute, Drop];
        check_object_privilege(SchemaObjectType::Role, valid, 42);

        // universe
        let valid = &[Login];
        check_object_privilege(SchemaObjectType::Universe, valid, 0);
    }

    #[test]
    fn routine_def_matches_format() {
        let i = RoutineDef::for_tests();
        let tuple_data = i.to_tuple_buffer().unwrap();
        let format = RoutineDef::format();
        crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "RoutineDef::format");
    }
}