-
Georgy Moshkin authoredGeorgy Moshkin authored
schema.rs 66.12 KiB
use sbroad::ir::ddl::Language;
use std::borrow::Cow;
use std::collections::{BTreeMap, HashSet};
use std::fmt::Display;
use std::time::Duration;
use tarantool::auth::AuthData;
use tarantool::auth::AuthDef;
use tarantool::auth::AuthMethod;
use tarantool::error::TarantoolError;
use tarantool::error::TarantoolErrorCode;
use tarantool::fiber;
use tarantool::session::UserId;
use tarantool::set_error;
use tarantool::space::{FieldType, SpaceCreateOptions, SpaceEngineType};
use tarantool::space::{Metadata as SpaceMetadata, Space, SpaceType, SystemSpace};
use tarantool::transaction::{transaction, TransactionError};
use tarantool::{
index::IteratorType,
index::Metadata as IndexMetadata,
index::{IndexId, Part},
space::SpaceId,
tlua::{self, LuaRead},
tuple::Encode,
util::Value,
};
use sbroad::ir::value::Value as IrValue;
use serde::{Deserialize, Serialize};
use crate::cas::{self, compare_and_swap};
use crate::storage;
use crate::storage::{Clusterwide, SPACE_ID_INTERNAL_MAX};
use crate::storage::{ClusterwideTable, PropertyName};
use crate::traft::error::Error;
use crate::traft::op::{Ddl, Op};
use crate::traft::{self, node, RaftIndex};
use crate::util::effective_user_id;
/// The initial local schema version. Immediately after the cluster is bootted
/// it has this schema version.
///
/// If a schema definition is marked with this version, it means the schema
/// defintion is builtin and is applied by default on all instances.
pub const INITIAL_SCHEMA_VERSION: u64 = 0;
////////////////////////////////////////////////////////////////////////////////
// TableDef
////////////////////////////////////////////////////////////////////////////////
/// Database table definition.
///
/// Describes a user-defined table.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct TableDef {
pub id: SpaceId,
pub name: String,
pub distribution: Distribution,
pub format: Vec<tarantool::space::Field>,
pub schema_version: u64,
pub operable: bool,
pub engine: SpaceEngineType,
pub owner: UserId,
}
impl Encode for TableDef {}
impl TableDef {
/// Index (0-based) of field "operable" in the _pico_table table format.
pub const FIELD_OPERABLE: usize = 5;
/// Format of the _pico_table global table.
#[inline(always)]
pub fn format() -> Vec<tarantool::space::Field> {
use tarantool::space::Field;
vec![
Field::from(("id", FieldType::Unsigned)),
Field::from(("name", FieldType::String)),
Field::from(("distribution", FieldType::Array)),
Field::from(("format", FieldType::Array)),
Field::from(("schema_version", FieldType::Unsigned)),
Field::from(("operable", FieldType::Boolean)),
Field::from(("engine", FieldType::String)),
Field::from(("owner", FieldType::Unsigned)),
]
}
/// A dummy instance of the type for use in tests.
#[inline(always)]
pub fn for_tests() -> Self {
Self {
id: 10569,
name: "stuff".into(),
distribution: Distribution::Global,
format: vec![],
schema_version: 420,
operable: true,
engine: SpaceEngineType::Blackhole,
owner: 42,
}
}
pub fn to_space_metadata(&self) -> traft::Result<SpaceMetadata> {
let format = fields_to_format(&self.format);
let mut flags = BTreeMap::new();
if matches!(self.distribution, Distribution::Global) {
flags.insert("group_id".into(), 1.into());
}
let space_def = SpaceMetadata {
id: self.id,
user_id: self.owner,
name: self.name.as_str().into(),
engine: self.engine,
field_count: 0,
flags,
format,
};
Ok(space_def)
}
}
/// Definitions of builtin tables & their respective indexes.
/// These should be inserted into "_pico_table" & "_pico_index" at cluster bootstrap.
pub fn system_table_definitions() -> Vec<(TableDef, Vec<IndexDef>)> {
let mut result = Vec::with_capacity(ClusterwideTable::all_tables().len());
for sys_table in ClusterwideTable::all_tables() {
let table_def = TableDef {
id: sys_table.id(),
name: sys_table.name().into(),
distribution: Distribution::Global,
format: sys_table.format(),
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
operable: true,
engine: SpaceEngineType::Memtx,
owner: ADMIN_ID,
};
let index_defs = sys_table.index_definitions();
result.push((table_def, index_defs));
}
// TODO: there's also "_raft_log" & "_raft_state" spaces, but we don't treat
// them the same as others for some reason?
result
}
// FIXME: move this to tarantool-module
pub fn fields_to_format(
fields: &[tarantool::space::Field],
) -> Vec<BTreeMap<Cow<'static, str>, Value<'_>>> {
let mut result = Vec::with_capacity(fields.len());
for field in fields {
let mut field_map = BTreeMap::new();
field_map.insert("name".into(), Value::Str(field.name.as_str().into()));
field_map.insert("type".into(), Value::Str(field.field_type.as_str().into()));
field_map.insert("is_nullable".into(), Value::Bool(field.is_nullable));
result.push(field_map);
}
result
}
////////////////////////////////////////////////////////////////////////////////
// Distribution
////////////////////////////////////////////////////////////////////////////////
/// Defines how to distribute tuples in a table across replicasets.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, LuaRead)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "kind")]
pub enum Distribution {
/// Tuples will be replicated to each instance.
Global,
/// Tuples will be implicitely sharded. E.g. sent to the corresponding bucket
/// which will be determined by a hash of the provided `sharding_key`.
ShardedImplicitly {
sharding_key: Vec<String>,
#[serde(default)]
sharding_fn: ShardingFn,
},
/// Tuples will be explicitely sharded. E.g. sent to the bucket
/// which id is provided by field that is specified here.
///
/// Default field name: "bucket_id"
ShardedByField {
#[serde(default = "default_bucket_id_field")]
field: String,
},
}
fn default_bucket_id_field() -> String {
"bucket_id".into()
}
::tarantool::define_str_enum! {
/// Custom sharding functions are not yet supported.
#[derive(Default)]
pub enum ShardingFn {
Crc32 = "crc32",
#[default]
Murmur3 = "murmur3",
Xxhash = "xxhash",
Md5 = "md5",
}
}
////////////////////////////////////////////////////////////////////////////////
// IndexDef
////////////////////////////////////////////////////////////////////////////////
/// Database index definition.
///
/// Describes a user-defined index.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct IndexDef {
pub table_id: SpaceId,
pub id: IndexId,
pub name: String,
// TODO: document this field's meaning, for now it seems it's always `true`
pub local: bool,
pub parts: Vec<Part>,
pub schema_version: u64,
pub operable: bool,
pub unique: bool,
}
impl Encode for IndexDef {}
impl IndexDef {
/// Index (0-based) of field "operable" in _pico_index table format.
pub const FIELD_OPERABLE: usize = 6;
/// Format of the _pico_index global table.
#[inline(always)]
pub fn format() -> Vec<tarantool::space::Field> {
use tarantool::space::Field;
vec![
Field::from(("table_id", FieldType::Unsigned)),
Field::from(("id", FieldType::Unsigned)),
Field::from(("name", FieldType::String)),
Field::from(("local", FieldType::Boolean)),
Field::from(("parts", FieldType::Array)),
Field::from(("schema_version", FieldType::Unsigned)),
Field::from(("operable", FieldType::Boolean)),
Field::from(("unique", FieldType::Boolean)),
]
}
/// A dummy instance of the type for use in tests.
#[inline(always)]
pub fn for_tests() -> Self {
Self {
table_id: 10569,
id: 1,
name: "secondary".into(),
local: true,
parts: vec![],
schema_version: 420,
operable: true,
unique: false,
}
}
pub fn to_index_metadata(&self) -> IndexMetadata {
use tarantool::index::IndexType;
let mut opts = BTreeMap::new();
opts.insert(Cow::from("unique"), Value::Bool(self.unique));
let index_meta = IndexMetadata {
space_id: self.table_id,
index_id: self.id,
name: self.name.as_str().into(),
r#type: IndexType::Tree,
opts,
parts: self.parts.clone(),
};
index_meta
}
}
////////////////////////////////////////////////////////////////////////////////
// UserDef
////////////////////////////////////////////////////////////////////////////////
/// User definition.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct UserDef {
pub id: UserId,
pub name: String,
pub schema_version: u64,
pub auth: AuthDef,
pub owner: UserId,
}
impl Encode for UserDef {}
impl UserDef {
/// Index of field "auth" in the space _pico_user format.
///
/// Index of first field is 0.
pub const FIELD_AUTH: usize = 3;
/// Format of the _pico_user global table.
#[inline(always)]
pub fn format() -> Vec<tarantool::space::Field> {
use tarantool::space::Field;
vec![
Field::from(("id", FieldType::Unsigned)),
Field::from(("name", FieldType::String)),
Field::from(("schema_version", FieldType::Unsigned)),
Field::from(("auth", FieldType::Array)),
Field::from(("owner", FieldType::Unsigned)),
]
}
/// A dummy instance of the type for use in tests.
#[inline(always)]
pub fn for_tests() -> Self {
Self {
id: 69,
name: "david".into(),
schema_version: 421,
auth: AuthDef::new(tarantool::auth::AuthMethod::ChapSha1, "".into()),
owner: 42,
}
}
}
////////////////////////////////////////////////////////////////////////////////
// RoleDef
////////////////////////////////////////////////////////////////////////////////
/// Role definition.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct RoleDef {
pub id: UserId,
pub name: String,
pub schema_version: u64,
pub owner: UserId,
}
impl Encode for RoleDef {}
impl RoleDef {
/// Format of the _pico_role global table.
#[inline(always)]
pub fn format() -> Vec<tarantool::space::Field> {
use tarantool::space::Field;
vec![
Field::from(("id", FieldType::Unsigned)),
Field::from(("name", FieldType::String)),
Field::from(("schema_version", FieldType::Unsigned)),
Field::from(("owner", FieldType::Unsigned)),
]
}
/// A dummy instance of the type for use in tests.
#[inline(always)]
pub fn for_tests() -> Self {
Self {
id: 13,
name: "devops".into(),
schema_version: 419,
owner: 42,
}
}
}
////////////////////////////////////////////////////////////////////////////////
// PrivilegeDef
////////////////////////////////////////////////////////////////////////////////
/// User id of the builtin user "guest".
///
/// Default "untrusted" user.
///
/// See also <https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/_user/#box-space-user>
pub const GUEST_ID: UserId = 0;
/// User id of the builtin user "admin".
///
/// Note: Admin user id is used when we need to elevate privileges
/// because current user doesnt have access to system spaces
///
/// See also <https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/_user/#box-space-user>
pub const ADMIN_ID: UserId = 1;
/// User id of the builtin role "public".
///
/// Pre-defined role, automatically granted to new users.
/// Granting some privilege to this role is equivalent granting the privilege to everybody.
///
/// See also <https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/_user/#box-space-user>
pub const PUBLIC_ID: UserId = 2;
/// User id of the builtin role "replication".
///
/// Role "replication" has the following grants:
/// - Read access to the "universe"
/// - Write access to the space "_cluster"
pub const ROLE_REPLICATION_ID: i64 = 3;
/// User id of the builtin role "super".
///
/// Users with this role have access to everything.
///
/// See also <https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/_user/#box-space-user>
pub const SUPER_ID: UserId = 31;
/// User id of the builtin user "pico_service".
///
/// A special user for internal communication between instances of picodata.
/// It is equivalent in it's privileges to "admin". The only difference is that
/// only the automated rpc calls are performed as "pico_service".
pub const PICO_SERVICE_ID: UserId = 32;
/// Object id of the special builtin object "universe".
///
/// Object "universe" is basically an alias to the "whole database".
/// Granting access to "universe" is equivalent to granting access
/// to all objects of all types for which the privilege makes sense.
pub const UNIVERSE_ID: i64 = 0;
tarantool::define_str_enum! {
pub enum SchemaObjectType {
Table = "table",
Role = "role",
Routine = "routine",
User = "user",
Universe = "universe",
}
}
impl SchemaObjectType {
pub fn as_tarantool(&self) -> &'static str {
match self {
SchemaObjectType::Table => "space",
t => t.as_str(),
}
}
}
tarantool::define_str_enum! {
/// Picodata privilege types. For correspondence with
/// tarantool privilege types see [`PrivilegeType::as_tarantool`].
///
/// For each variant it is described which SQL queries in picodata
/// a user with this privilege can execute.
pub enum PrivilegeType {
/// Allows SQL queries: `SELECT`
Read = "read",
/// Allows SQL queries: `INSERT`, `UPDATE`, `DELETE`
Write = "write",
/// Allows SQL queries: `CALL`.
/// Also can indicate that role is granted to a user
/// if object type is role.
Execute = "execute",
/// Allows a user to log-in when connecting to picodata instance
Login = "login",
/// Allows SQL queries: `CREATE`
Create = "create",
/// Allows SQL queries: `DROP`
Drop = "drop",
/// Allows SQL queries: `ALTER`
Alter = "alter",
}
}
impl PrivilegeType {
/// Converts picodata privilege to a string that can be passed to
/// `box.schema.user.grant` as `privileges`
pub fn as_tarantool(&self) -> &'static str {
match self {
PrivilegeType::Login => "usage",
t => t.as_str(),
}
}
}
/// Privilege definition.
/// Note the differences between picodata privileges and vanilla tarantool ones.
/// 1) Super role in picodata is a placeholder. It exists because picodata reuses user
/// ids from vanilla and super occupies an id. So to avoid assigning some piciodata
/// user Id of the vanilla role super we keep it as a placeholder. Aside from that
/// vanilla super role has some privileges that cannot be represented in picodata,
/// namely privileges on universe. This hole opens possibility for unwanted edge cases.
/// 2) Only Session and Usage can be granted on Universe.
/// Note that validation is not performed in Deserialize. We assume that for untrusted data
/// the object is created via constructor. In other cases validation was already performed
/// prior to serialization thus deserialization always creates a valid object.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct PrivilegeDef {
privilege: PrivilegeType,
object_type: SchemaObjectType,
/// `-1` denotes an absense of a target object.
/// Other values should be >= 0 and denote an existing target object.
/// When working with `object_type` `universe` it might seem that it does
/// not have a target object and `object_id` should be `-1`, this is incorrect
/// universe has a target object with `object_id == 0`.
///
/// To get the value of this field as `Option<u32>` see [`Self::object_id`]
object_id: i64,
/// Id of the user or role to whom the privilege is granted.
///
/// In tarantool users and roles are stored in the same space, which means a
/// role and a user cannot have the same id or name.
grantee_id: UserId,
grantor_id: UserId,
schema_version: u64,
}
impl Encode for PrivilegeDef {}
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub struct InvalidPrivilegeError {
object_type: SchemaObjectType,
unsupported: PrivilegeType,
expected_one_of: &'static [PrivilegeType],
}
impl Display for InvalidPrivilegeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"Unsupported {} privilege '{}', expected one of {:?}",
self.object_type, self.unsupported, self.expected_one_of,
))
}
}
impl PrivilegeDef {
pub fn new(
privilege: PrivilegeType,
object_type: SchemaObjectType,
object_id: i64,
grantee_id: UserId,
grantor_id: UserId,
schema_version: u64,
) -> Result<PrivilegeDef, InvalidPrivilegeError> {
let privilege_def = PrivilegeDef {
privilege,
object_type,
object_id,
grantee_id,
grantor_id,
schema_version,
};
use PrivilegeType::*;
let valid_privileges: &[PrivilegeType] = match privilege_def.object_type {
SchemaObjectType::Table => match privilege_def.object_id() {
Some(_) => &[Read, Write, Alter, Drop],
None => &[Read, Write, Create, Alter, Drop],
},
SchemaObjectType::Role => match privilege_def.object_id() {
Some(_) => &[Execute, Drop],
None => &[Create, Drop],
},
SchemaObjectType::User => match privilege_def.object_id() {
Some(_) => &[Alter, Drop],
None => &[Create, Alter, Drop],
},
SchemaObjectType::Universe => &[Login],
SchemaObjectType::Routine => match privilege_def.object_id() {
Some(_) => &[Execute, Drop],
None => &[Create, Drop],
},
};
if !valid_privileges.contains(&privilege) {
return Err(InvalidPrivilegeError {
object_type,
unsupported: privilege,
expected_one_of: valid_privileges,
});
}
Ok(privilege_def)
}
#[inline(always)]
pub fn privilege(&self) -> PrivilegeType {
self.privilege
}
#[inline(always)]
pub fn object_type(&self) -> SchemaObjectType {
self.object_type
}
/// Get `object_id` field interpreting `-1` as `None`.
#[inline(always)]
pub fn object_id(&self) -> Option<u32> {
if self.object_id >= 0 {
Some(self.object_id as _)
} else {
debug_assert_eq!(self.object_id, -1, "object_id should be >= -1");
None
}
}
#[inline(always)]
pub fn object_id_raw(&self) -> i64 {
self.object_id
}
#[inline(always)]
pub fn grantee_id(&self) -> UserId {
self.grantee_id
}
#[inline(always)]
pub fn grantor_id(&self) -> UserId {
self.grantor_id
}
#[inline(always)]
pub fn schema_version(&self) -> u64 {
self.schema_version
}
/// Format of the _pico_privilege global table.
#[inline(always)]
pub fn format() -> Vec<tarantool::space::Field> {
use tarantool::space::Field;
vec![
Field::from(("privilege", FieldType::String)),
Field::from(("object_type", FieldType::String)),
Field::from(("object_id", FieldType::Integer)),
Field::from(("grantee_id", FieldType::Unsigned)),
Field::from(("grantor_id", FieldType::Unsigned)),
Field::from(("schema_version", FieldType::Unsigned)),
]
}
/// A dummy instance of the type for use in tests.
#[inline(always)]
pub fn for_tests() -> Self {
Self {
grantor_id: 13,
grantee_id: 37,
object_type: SchemaObjectType::User,
object_id: -1,
privilege: PrivilegeType::Create,
schema_version: 337,
}
}
/// Retrieves object_name from system spaces based on `object_id` and `object_type`.
/// Returns `Ok(None)` in the case when the privilege has no target object (e.g. `object_id == -1`)
/// or when target object is universe.
/// Returns `Err` in case when target object was not found in system tables.
///
/// # Panics
/// 1. On storage failure
pub fn resolve_object_name(&self, storage: &Clusterwide) -> Result<Option<String>, Error> {
let Some(id) = self.object_id() else {
return Ok(None);
};
let name = match self.object_type {
SchemaObjectType::Table => storage.tables.get(id).map(|t| t.map(|t| t.name)),
SchemaObjectType::Role => storage.roles.by_id(id).map(|t| t.map(|t| t.name)),
SchemaObjectType::User => storage.users.by_id(id).map(|t| t.map(|t| t.name)),
SchemaObjectType::Universe => {
debug_assert_eq!(self.object_id, 0);
return Ok(None);
}
SchemaObjectType::Routine => storage.routines.by_id(id).map(|t| t.map(|t| t.name)),
}
.expect("storage should not fail")
.ok_or_else(|| Error::other(format!("object with id {id} should exist")))?;
Ok(Some(name))
}
}
////////////////////////////////////////////////////////////////////////////////
// system_user_definitions
////////////////////////////////////////////////////////////////////////////////
/// Definitions of builtin users & their respective privileges.
/// These should be inserted into "_pico_user" & "_pico_privilege" at cluster bootstrap.
pub fn system_user_definitions() -> Vec<(UserDef, Vec<PrivilegeDef>)> {
let mut result = vec![];
let initiator = ADMIN_ID;
//
// User "guest"
//
// equivalent SQL expression: CREATE USER 'guest' WITH PASSWORD '' USING chap-sha1
{
let user_def = UserDef {
id: GUEST_ID,
name: "guest".into(),
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
auth: AuthDef::new(
AuthMethod::ChapSha1,
AuthData::new(&AuthMethod::ChapSha1, "guest", "").into_string(),
),
owner: initiator,
};
let priv_defs = vec![
PrivilegeDef {
grantee_id: user_def.id,
privilege: PrivilegeType::Login,
object_type: SchemaObjectType::Universe,
object_id: UNIVERSE_ID,
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
grantor_id: initiator,
},
PrivilegeDef {
grantee_id: user_def.id,
privilege: PrivilegeType::Execute,
object_type: SchemaObjectType::Role,
object_id: PUBLIC_ID as _,
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
grantor_id: initiator,
},
];
result.push((user_def, priv_defs));
}
//
// User "admin"
//
{
let user_def = UserDef {
id: ADMIN_ID,
name: "admin".into(),
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
// This place slightly differs from the tarantool
// implementation. In vanilla tarantool the auth_def is an empty
// MP_MAP. Here for simplicity given available module api we
// use ChapSha with invalid password (its impossible to get
// empty string as output of sha1)
auth: AuthDef::new(AuthMethod::ChapSha1, "".into()),
owner: initiator,
};
let mut priv_defs = Vec::with_capacity(PrivilegeType::VARIANTS.len());
// Grant all privileges on "universe" to "admin".
for &privilege in PrivilegeType::VARIANTS {
priv_defs.push(PrivilegeDef {
grantee_id: user_def.id,
privilege,
object_type: SchemaObjectType::Universe,
object_id: UNIVERSE_ID,
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
grantor_id: initiator,
});
}
result.push((user_def, priv_defs));
}
{
let user_def = UserDef {
id: PICO_SERVICE_ID,
name: PICO_SERVICE_USER_NAME.into(),
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
auth: AuthDef::new(
AuthMethod::ChapSha1,
tarantool::auth::AuthData::new(&AuthMethod::ChapSha1, PICO_SERVICE_USER_NAME, "")
.into_string(),
),
owner: initiator,
};
let mut priv_defs = Vec::with_capacity(PrivilegeType::VARIANTS.len() + 1);
// Grant all privileges on "universe" to "pico_service".
for &privilege in PrivilegeType::VARIANTS {
priv_defs.push(PrivilegeDef {
grantee_id: user_def.id,
privilege,
object_type: SchemaObjectType::Universe,
object_id: UNIVERSE_ID,
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
grantor_id: initiator,
});
}
// Role "replication" is needed explicitly
priv_defs.push(PrivilegeDef {
grantee_id: user_def.id,
privilege: PrivilegeType::Execute,
object_type: SchemaObjectType::Role,
object_id: ROLE_REPLICATION_ID,
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
grantor_id: initiator,
});
result.push((user_def, priv_defs));
}
result
}
/// Definitions of builtin roles & their respective privileges.
/// These should be inserted into "_pico_role" & "_pico_privilege" at cluster bootstrap.
// TODO: maybe this "_pico_role" should be merged with "_pico_user"
pub fn system_role_definitions() -> Vec<(RoleDef, Vec<PrivilegeDef>)> {
let mut result = vec![];
let public_def = RoleDef {
id: PUBLIC_ID,
name: "public".into(),
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
owner: ADMIN_ID,
};
let public_privs = vec![
// TODO:
// - we grant "execute" access to a bunch of stored procs
// - vanilla tarantool grants "read" access to a bunch of system spaces
];
result.push((public_def, public_privs));
let super_def = RoleDef {
id: SUPER_ID,
name: "super".into(),
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
owner: ADMIN_ID,
};
let super_privs = vec![
// Special role, it's privileges are implicit
];
result.push((super_def, super_privs));
// There's also a "replication" builtin role, but we don't care about it?
result
}
////////////////////////////////////////////////////////////////////////////////
// init_pico_service
////////////////////////////////////////////////////////////////////////////////
/// Name of the special builtin user for internal communication between
/// instances. It's id is [`PICO_SERVICE_ID`].
///
/// Use this constant instead of literal "pico_service" so that it's easier to
/// find all the places where we refer to "pico_service".
pub const PICO_SERVICE_USER_NAME: &'static str = "pico_service";
pub fn init_user_pico_service() {
let sys_user = SystemSpace::User.as_space();
let sys_priv = SystemSpace::Priv.as_space();
let t = sys_user
.get(&[PICO_SERVICE_ID])
.expect("reading from _user shouldn't fail");
if t.is_some() {
// Already exists (instance restarted)
return;
}
let found = system_user_definitions()
.into_iter()
.find(|(user_def, _)| user_def.id == PICO_SERVICE_ID);
let Some((user_def, _)) = &found else {
panic!("Couldn't find definition for '{PICO_SERVICE_USER_NAME}' system user");
};
let res = storage::acl::on_master_create_user(user_def, false);
if let Err(e) = res {
panic!("failed creating user '{PICO_SERVICE_USER_NAME}': {e}");
}
// Grant ALL privileges to "every object of every type".
const PRIVILEGE_ALL: u32 = 0xffff_ffff;
let res = sys_priv.insert(&(
ADMIN_ID,
PICO_SERVICE_ID,
"universe",
UNIVERSE_ID,
PRIVILEGE_ALL,
));
if let Err(e) = res {
panic!("failed creating user '{PICO_SERVICE_USER_NAME}': {e}");
}
// Also grant role "replication", because all privileges to "every object of
// every type" is not enough.
const PRIVILEGE_EXECUTE: u32 = 4;
let res = sys_priv.insert(&(
ADMIN_ID,
PICO_SERVICE_ID,
"role",
ROLE_REPLICATION_ID,
PRIVILEGE_EXECUTE,
));
if let Err(e) = res {
panic!("failed creating user '{PICO_SERVICE_USER_NAME}': {e}");
}
}
////////////////////////////////////////////////////////////////////////////////
// RoutineDef
////////////////////////////////////////////////////////////////////////////////
/// Routine kind.
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum RoutineKind {
#[default]
Procedure,
}
impl Display for RoutineKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RoutineKind::Procedure => write!(f, "procedure"),
}
}
}
/// Parameter mode.
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum RoutineParamMode {
#[default]
In,
}
impl Display for RoutineParamMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RoutineParamMode::In => write!(f, "in"),
}
}
}
/// Routine parameter definition.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct RoutineParamDef {
#[serde(default)]
pub mode: RoutineParamMode,
pub r#type: FieldType,
#[serde(default)]
pub default: Option<IrValue>,
}
impl Display for RoutineParamDef {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "mode: {}, type: {}", self.mode, self.r#type)?;
if let Some(default) = &self.default {
write!(f, ", default: {}", default)?;
}
Ok(())
}
}
impl Default for RoutineParamDef {
fn default() -> Self {
Self {
mode: RoutineParamMode::default(),
r#type: FieldType::Scalar,
default: None,
}
}
}
impl RoutineParamDef {
pub fn with_type(self, r#type: FieldType) -> Self {
Self { r#type, ..self }
}
}
pub type RoutineParams = Vec<RoutineParamDef>;
pub type RoutineReturns = Vec<IrValue>;
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum RoutineLanguage {
#[default]
SQL,
}
impl From<Language> for RoutineLanguage {
fn from(language: Language) -> Self {
match language {
Language::SQL => Self::SQL,
}
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum RoutineSecurity {
#[default]
Invoker,
}
/// Routine definition.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct RoutineDef {
pub id: u32,
pub name: String,
pub kind: RoutineKind,
pub params: RoutineParams,
pub returns: RoutineReturns,
pub language: RoutineLanguage,
pub body: String,
pub security: RoutineSecurity,
pub operable: bool,
pub schema_version: u64,
pub owner: UserId,
}
impl Encode for RoutineDef {}
impl RoutineDef {
/// Index (0-based) of field "operable" in _pico_routine table format.
pub const FIELD_OPERABLE: usize = 8;
/// Format of the _pico_routine global table.
#[inline(always)]
pub fn format() -> Vec<tarantool::space::Field> {
use tarantool::space::Field;
vec![
Field::from(("id", FieldType::Unsigned)),
Field::from(("name", FieldType::String)),
Field::from(("kind", FieldType::String)),
Field::from(("params", FieldType::Array)),
Field::from(("returns", FieldType::Array)),
Field::from(("language", FieldType::String)),
Field::from(("body", FieldType::String)),
Field::from(("security", FieldType::String)),
Field::from(("operable", FieldType::Boolean)),
Field::from(("schema_version", FieldType::Unsigned)),
Field::from(("owner", FieldType::Unsigned)),
]
}
/// A dummy instance of the type for use in tests.
#[inline(always)]
pub fn for_tests() -> Self {
Self {
id: 16005,
name: "proc".into(),
kind: RoutineKind::Procedure,
params: vec![
RoutineParamDef {
mode: RoutineParamMode::In,
r#type: FieldType::String,
default: Some(IrValue::String("hello".into())),
},
RoutineParamDef {
mode: RoutineParamMode::In,
r#type: FieldType::Unsigned,
default: None,
},
],
returns: vec![],
language: RoutineLanguage::SQL,
body: "values (?), (?)".into(),
security: RoutineSecurity::Invoker,
operable: true,
schema_version: 421,
owner: 42,
}
}
}
////////////////////////////////////////////////////////////////////////////////
// ...
////////////////////////////////////////////////////////////////////////////////
// TODO: this should be a TryFrom in tarantool-module
pub fn try_space_field_type_to_index_field_type(
ft: tarantool::space::FieldType,
) -> Option<tarantool::index::FieldType> {
use tarantool::index::FieldType as IFT;
use tarantool::space::FieldType as SFT;
let res = match ft {
SFT::Any => None,
SFT::Unsigned => Some(IFT::Unsigned),
SFT::String => Some(IFT::String),
SFT::Number => Some(IFT::Number),
SFT::Double => Some(IFT::Double),
SFT::Integer => Some(IFT::Integer),
SFT::Boolean => Some(IFT::Boolean),
SFT::Varbinary => Some(IFT::Varbinary),
SFT::Scalar => Some(IFT::Scalar),
SFT::Decimal => Some(IFT::Decimal),
SFT::Uuid => Some(IFT::Uuid),
SFT::Datetime => Some(IFT::Datetime),
SFT::Interval => None,
SFT::Array => Some(IFT::Array),
SFT::Map => None,
};
res
}
#[derive(Debug, thiserror::Error)]
pub enum DdlError {
#[error("{0}")]
CreateTable(#[from] CreateTableError),
#[error("ddl operation was aborted")]
Aborted,
#[error("there is no pending ddl operation")]
NoPendingDdl,
#[error("{0}")]
CreateRoutine(#[from] CreateRoutineError),
}
#[derive(Debug, thiserror::Error)]
pub enum CreateTableError {
#[error("space with id {id} exists with a different name '{actual_name}', but expected '{expected_name}'")]
ExistsWithDifferentName {
id: SpaceId,
expected_name: String,
actual_name: String,
},
#[error("several fields have the same name: {0}")]
DuplicateFieldName(String),
#[error("no field with name: {0}")]
FieldUndefined(String),
#[error("distribution is `sharded`, but neither `by_field` nor `sharding_key` is set")]
ShardingPolicyUndefined,
#[error("only one of sharding policy fields (`by_field`, `sharding_key`) should be set")]
ConflictingShardingPolicy,
#[error("global spaces only support memtx engine")]
IncompatibleGlobalSpaceEngine,
}
impl From<CreateTableError> for Error {
fn from(err: CreateTableError) -> Self {
DdlError::CreateTable(err).into()
}
}
#[derive(Debug, thiserror::Error)]
pub enum CreateRoutineError {
#[error("routine {name} already exists with a different kind")]
ExistsWithDifferentKind { name: String },
#[error("routine {name} already exists with different parameters")]
ExistsWithDifferentParams { name: String },
#[error("routine {name} already exists with a different language")]
ExistsWithDifferentLanguage { name: String },
#[error("routine {name} already exists with a different body")]
ExistsWithDifferentBody { name: String },
#[error("routine {name} already exists with a different security")]
ExistsWithDifferentSecurity { name: String },
#[error("routine {name} already exists with a different owner")]
ExistsWithDifferentOwner { name: String },
}
impl From<CreateRoutineError> for Error {
fn from(err: CreateRoutineError) -> Self {
DdlError::CreateRoutine(err).into()
}
}
// TODO: Add `LuaRead` to tarantool::space::Field and use it
#[derive(Clone, Debug, LuaRead)]
pub struct Field {
pub name: String, // TODO(gmoshkin): &str
pub r#type: FieldType,
pub is_nullable: bool,
}
impl From<Field> for tarantool::space::Field {
fn from(field: Field) -> Self {
tarantool::space::Field {
name: field.name,
field_type: field.r#type,
is_nullable: field.is_nullable,
}
}
}
::tarantool::define_str_enum! {
#[derive(Default)]
pub enum DistributionParam {
#[default]
Global = "global",
Sharded = "sharded",
}
}
#[derive(Clone, Debug)]
pub struct CreateProcParams {
pub name: String,
pub params: RoutineParams,
pub language: RoutineLanguage,
pub body: String,
pub security: RoutineSecurity,
pub owner: UserId,
}
impl CreateProcParams {
pub fn func_exists(&self) -> bool {
let func_space = Space::from(SystemSpace::Func);
let name_idx = func_space
.index_cached("name")
.expect("_function should have an index by name");
let t = name_idx
.get(&[&self.name])
.expect("reading from _function shouldn't fail");
t.is_some()
}
pub fn validate(&self, storage: &Clusterwide) -> traft::Result<()> {
let routine = storage.routines.by_name(&self.name)?;
if let Some(def) = routine {
if def.kind != RoutineKind::Procedure {
return Err(CreateRoutineError::ExistsWithDifferentKind { name: def.name })?;
}
if def.params != self.params {
return Err(CreateRoutineError::ExistsWithDifferentParams { name: def.name })?;
}
if def.language != self.language {
return Err(CreateRoutineError::ExistsWithDifferentLanguage { name: def.name })?;
}
if def.body != self.body {
return Err(CreateRoutineError::ExistsWithDifferentBody { name: def.name })?;
}
if def.security != self.security {
return Err(CreateRoutineError::ExistsWithDifferentSecurity { name: def.name })?;
}
if def.owner != self.owner {
return Err(CreateRoutineError::ExistsWithDifferentOwner { name: def.name })?;
}
}
Ok(())
}
}
#[derive(Clone, Debug, LuaRead)]
pub struct CreateTableParams {
pub(crate) id: Option<SpaceId>,
pub(crate) name: String,
pub(crate) format: Vec<Field>,
pub(crate) primary_key: Vec<String>,
pub(crate) distribution: DistributionParam,
pub(crate) by_field: Option<String>,
pub(crate) sharding_key: Option<Vec<String>>,
pub(crate) sharding_fn: Option<ShardingFn>,
pub(crate) engine: Option<SpaceEngineType>,
pub(crate) owner: UserId,
/// Timeout in seconds.
///
/// Specifying the timeout identifies how long user is ready to wait for ddl to be applied.
/// But it does not provide guarantees that a ddl will be aborted if wait for commit timeouts.
pub timeout: Option<f64>,
}
impl CreateTableParams {
/// Checks if space described by options already exists. Returns an error if
/// the space with given id exists, but has a different name.
pub fn space_exists(&self) -> traft::Result<bool> {
// The check is performed using `box.space` API, so that local spaces are counted too.
let sys_space = Space::from(SystemSpace::Space);
let Some(id) = self.id else {
let sys_space_by_name = sys_space
.index_cached("name")
.expect("_space should have an index by name");
let t = sys_space_by_name
.get(&[&self.name])
.expect("reading from _space shouldn't fail");
return Ok(t.is_some());
};
let t = sys_space
.get(&[id])
.expect("reading from _space shouldn't fail");
let Some(t) = t else {
return Ok(false);
};
let existing_name: &str = t.get("name").expect("space metadata should contain a name");
if existing_name == self.name {
return Ok(true);
} else {
// TODO: check everything else is the same
// https://git.picodata.io/picodata/picodata/picodata/-/issues/331
return Err(CreateTableError::ExistsWithDifferentName {
id,
expected_name: self.name.clone(),
actual_name: existing_name.into(),
}
.into());
}
}
pub fn validate(&self) -> traft::Result<()> {
// Check space id fits in the allowed range
if let Some(id) = self.id {
if id <= SPACE_ID_INTERNAL_MAX {
crate::tlog!(Warning, "requested space id {id} is in the range 0..={SPACE_ID_INTERNAL_MAX} reserved for future use by picodata, you may have a conflict in a future version");
}
}
// All field names are unique
let mut field_names = HashSet::new();
for field in &self.format {
if !field_names.insert(field.name.as_str()) {
return Err(CreateTableError::DuplicateFieldName(field.name.clone()).into());
}
}
// All primary key components exist in fields
for part in &self.primary_key {
if !field_names.contains(part.as_str()) {
return Err(CreateTableError::FieldUndefined(part.clone()).into());
}
}
// Global spaces must have memtx engine
if self.distribution == DistributionParam::Global
&& self.engine.is_some_and(|e| e != SpaceEngineType::Memtx)
{
return Err(CreateTableError::IncompatibleGlobalSpaceEngine.into());
}
// All sharding key components exist in fields
if self.distribution == DistributionParam::Sharded {
match (&self.by_field, &self.sharding_key) {
(Some(by_field), None) => {
if !field_names.contains(by_field.as_str()) {
return Err(CreateTableError::FieldUndefined(by_field.clone()).into());
}
if self.sharding_fn.is_some() {
crate::tlog!(
Warning,
"`sharding_fn` is specified but will be ignored, as sharding `by_field` is set"
);
}
}
(None, Some(sharding_key)) => {
let mut parts = HashSet::new();
for part in sharding_key {
if !field_names.contains(part.as_str()) {
return Err(CreateTableError::FieldUndefined(part.clone()).into());
}
// And all parts are unique
if !parts.insert(part.as_str()) {
return Err(CreateTableError::DuplicateFieldName(part.clone()).into());
}
}
}
(None, None) => return Err(CreateTableError::ShardingPolicyUndefined.into()),
(Some(_), Some(_)) => {
return Err(CreateTableError::ConflictingShardingPolicy.into())
}
}
} else {
if self.by_field.is_some() {
crate::tlog!(
Warning,
"`by_field` is specified but will be ignored, as `distribution` is `global`"
);
}
if self.sharding_key.is_some() {
crate::tlog!(
Warning,
"`sharding_key` is specified but will be ignored, as `distribution` is `global`"
);
}
if self.sharding_fn.is_some() {
crate::tlog!(
Warning,
"`sharding_fn` is specified but will be ignored, as `distribution` is `global`"
);
}
}
Ok(())
}
/// Create space and then rollback.
///
/// Should be used for checking if a space with these params can be created.
pub fn test_create_space(&self, storage: &Clusterwide) -> traft::Result<()> {
let id = self.id.expect("space id should've been chosen by now");
let user = storage
.users
.by_id(self.owner)?
.ok_or_else(|| Error::Other(format!("user with id {} not found", self.owner).into()))?
.name;
let err = transaction(|| -> Result<(), Option<tarantool::error::Error>> {
// TODO: allow create_space to accept user by id
::tarantool::schema::space::create_space(
&self.name,
&SpaceCreateOptions {
if_not_exists: false,
engine: self.engine.unwrap_or_default(),
id: Some(id),
field_count: self.format.len() as u32,
user: Some(user),
space_type: SpaceType::Normal,
format: Some(
self.format
.iter()
.cloned()
.map(tarantool::space::Field::from)
.collect(),
),
},
)
.map_err(Some)?;
// Rollback space creation
Err(None)
})
.unwrap_err();
match err {
// Space was successfully created and rolled back
TransactionError::RolledBack(None) => Ok(()),
// Space creation failed
TransactionError::RolledBack(Some(err)) => Err(err.into()),
// Error during commit or rollback
err => panic!("transaction mechanism should not fail: {err:?}"),
}
}
/// Chooses an id for the new space if it's not set yet and sets `self.id`.
pub fn choose_id_if_not_specified(&mut self) -> traft::Result<()> {
let sys_space = Space::from(SystemSpace::Space);
let id = if let Some(id) = self.id {
id
} else {
let id_range_min = SPACE_ID_INTERNAL_MAX + 1;
let id_range_max = SPACE_ID_TEMPORARY_MIN;
let mut iter = sys_space.select(IteratorType::LT, &[id_range_max])?;
let tuple = iter.next().expect("there's always at least system spaces");
let mut max_id: SpaceId = tuple
.field(0)
.expect("space metadata should decode fine")
.expect("space id should always be present");
let find_next_unused_id = |start: SpaceId| -> Result<SpaceId, Error> {
let iter = sys_space.select(IteratorType::GE, &[start])?;
let mut next_id = start;
for tuple in iter {
let id: SpaceId = tuple
.field(0)
.expect("space metadata should decode fine")
.expect("space id should always be present");
if id != next_id {
// Found a hole in the id range.
return Ok(next_id);
}
next_id += 1;
}
Ok(next_id)
};
if max_id < id_range_min {
max_id = id_range_min;
}
let mut id = find_next_unused_id(max_id)?;
if id >= id_range_max {
id = find_next_unused_id(id_range_min)?;
if id >= id_range_max {
set_error!(TarantoolErrorCode::CreateSpace, "space id limit is reached");
return Err(TarantoolError::last().into());
}
}
id
};
self.id = Some(id);
Ok(())
}
pub fn into_ddl(self) -> traft::Result<Ddl> {
let id = self.id.expect("space id should've been chosen by now");
let primary_key: Vec<_> = self.primary_key.into_iter().map(Part::field).collect();
let format: Vec<_> = self
.format
.into_iter()
.map(tarantool::space::Field::from)
.collect();
let distribution = match self.distribution {
DistributionParam::Global => Distribution::Global,
DistributionParam::Sharded => {
if let Some(field) = self.by_field {
Distribution::ShardedByField { field }
} else {
Distribution::ShardedImplicitly {
sharding_key: self
.sharding_key
.expect("should be checked during `validate`"),
sharding_fn: self.sharding_fn.unwrap_or_default(),
}
}
}
};
let res = Ddl::CreateTable {
id,
name: self.name,
format,
primary_key,
distribution,
engine: self.engine.unwrap_or_default(),
owner: self.owner,
};
Ok(res)
}
}
/// Minimum id in the range of ids reserved for temporary spaces.
///
/// Temporary spaces need a special range of ids to avoid conflicts with
/// spaces defined on replicas. This value is defined in tarantool, see
/// <https://git.picodata.io/picodata/tarantool/-/blob/5c3c8ed32c7a9c84a0e86c8453269f0925ce63ed/src/box/schema_def.h#L67>
const SPACE_ID_TEMPORARY_MIN: SpaceId = 1 << 30;
/// Waits for a pending ddl to be either `Committed` or `Aborted` by the governor.
///
/// Returns an index of the corresponding `DdlCommit` entry.
///
/// If `timeout` is reached earlier returns an error.
pub fn wait_for_ddl_commit(
prepare_commit: RaftIndex,
timeout: Duration,
) -> traft::Result<RaftIndex> {
let node = node::global()?;
let raft_storage = &node.raft_storage;
let deadline = fiber::clock().saturating_add(timeout);
let last_seen = prepare_commit;
loop {
let cur_applied = node.get_index();
let new_entries = raft_storage.entries(last_seen + 1, cur_applied + 1, None)?;
for entry in new_entries {
if entry.entry_type != raft::prelude::EntryType::EntryNormal {
continue;
}
let index = entry.index;
let op = entry.into_op().unwrap_or(Op::Nop);
match op {
Op::DdlCommit => return Ok(index),
Op::DdlAbort => return Err(DdlError::Aborted.into()),
_ => (),
}
}
node.wait_index(cur_applied + 1, deadline.duration_since(fiber::clock()))?;
}
}
/// Aborts a pending DDL operation and waits for abort to be committed localy.
/// If `timeout` is reached earlier returns an error.
///
/// Returns an index of the corresponding DdlAbort raft entry, or an error if
/// there is no pending DDL operation.
pub fn abort_ddl(timeout: Duration) -> traft::Result<RaftIndex> {
let node = node::global()?;
loop {
if node.storage.properties.pending_schema_change()?.is_none() {
return Err(DdlError::NoPendingDdl.into());
}
let index = node.get_index();
let term = raft::Storage::term(&node.raft_storage, index)?;
#[rustfmt::skip]
let predicate = cas::Predicate {
index,
term,
ranges: vec![
cas::Range::new(ClusterwideTable::Property).eq([PropertyName::PendingSchemaChange]),
cas::Range::new(ClusterwideTable::Property).eq([PropertyName::GlobalSchemaVersion]),
cas::Range::new(ClusterwideTable::Property).eq([PropertyName::NextSchemaVersion]),
],
};
let (index, term) =
compare_and_swap(Op::DdlAbort, predicate, effective_user_id(), timeout)?;
node.wait_index(index, timeout)?;
if raft::Storage::term(&node.raft_storage, index)? != term {
// leader switched - retry
continue;
}
return Ok(index);
}
}
mod tests {
use tarantool::{auth::AuthMethod, space::FieldType};
use super::*;
fn storage() -> Clusterwide {
let storage = Clusterwide::for_tests();
storage
.users
.insert(&UserDef {
id: ADMIN_ID,
name: String::from("admin"),
schema_version: 0,
auth: AuthDef::new(AuthMethod::ChapSha1, String::from("")),
owner: ADMIN_ID,
})
.unwrap();
storage
}
#[::tarantool::test]
fn test_create_space() {
let storage = storage();
CreateTableParams {
id: Some(1337),
name: "friends_of_peppa".into(),
format: vec![
Field {
name: "id".into(),
r#type: FieldType::Number,
is_nullable: false,
},
Field {
name: "name".into(),
r#type: FieldType::String,
is_nullable: false,
},
],
primary_key: vec![],
distribution: DistributionParam::Global,
by_field: None,
sharding_key: None,
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
}
.test_create_space(&storage)
.unwrap();
assert!(tarantool::space::Space::find("friends_of_peppa").is_none());
CreateTableParams {
id: Some(1337),
name: "friends_of_peppa".into(),
format: vec![],
primary_key: vec![],
distribution: DistributionParam::Sharded,
by_field: None,
sharding_key: None,
sharding_fn: None,
engine: Some(SpaceEngineType::Vinyl),
timeout: None,
owner: ADMIN_ID,
}
.test_create_space(&storage)
.unwrap();
assert!(tarantool::space::Space::find("friends_of_peppa").is_none());
let err = CreateTableParams {
id: Some(0),
name: "friends_of_peppa".into(),
format: vec![],
primary_key: vec![],
distribution: DistributionParam::Global,
by_field: None,
sharding_key: None,
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
}
.test_create_space(&storage)
.unwrap_err();
assert_eq!(
err.to_string(),
"tarantool error: CreateSpace: Failed to create space 'friends_of_peppa': space id 0 is reserved"
);
}
#[::tarantool::test]
fn ddl() {
let new_space = "new_space";
let new_id = 1;
let field1 = Field {
name: "field1".into(),
r#type: FieldType::Any,
is_nullable: false,
};
let field2 = Field {
name: "field2".into(),
r#type: FieldType::Any,
is_nullable: false,
};
let err = CreateTableParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone(), field1.clone()],
primary_key: vec![],
distribution: DistributionParam::Global,
by_field: None,
sharding_key: None,
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
}
.validate()
.unwrap_err();
assert_eq!(err.to_string(), "several fields have the same name: field1");
let err = CreateTableParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone()],
primary_key: vec![field2.name.clone()],
distribution: DistributionParam::Global,
by_field: None,
sharding_key: None,
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
}
.validate()
.unwrap_err();
assert_eq!(err.to_string(), "no field with name: field2");
let err = CreateTableParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone()],
primary_key: vec![],
distribution: DistributionParam::Sharded,
by_field: None,
sharding_key: Some(vec![field2.name.clone()]),
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
}
.validate()
.unwrap_err();
assert_eq!(err.to_string(), "no field with name: field2");
let err = CreateTableParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone()],
primary_key: vec![],
distribution: DistributionParam::Sharded,
by_field: None,
sharding_key: Some(vec![field1.name.clone(), field1.name.clone()]),
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
}
.validate()
.unwrap_err();
assert_eq!(err.to_string(), "several fields have the same name: field1");
let err = CreateTableParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone()],
primary_key: vec![],
distribution: DistributionParam::Sharded,
by_field: Some(field2.name.clone()),
sharding_key: None,
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
}
.validate()
.unwrap_err();
assert_eq!(err.to_string(), "no field with name: field2");
let err = CreateTableParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone()],
primary_key: vec![],
distribution: DistributionParam::Sharded,
by_field: None,
sharding_key: None,
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
}
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"distribution is `sharded`, but neither `by_field` nor `sharding_key` is set"
);
let err = CreateTableParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone()],
primary_key: vec![],
distribution: DistributionParam::Sharded,
by_field: Some(field2.name.clone()),
sharding_key: Some(vec![]),
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
}
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"only one of sharding policy fields (`by_field`, `sharding_key`) should be set"
);
CreateTableParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone(), field2.clone()],
primary_key: vec![field2.name.clone()],
distribution: DistributionParam::Sharded,
by_field: Some(field2.name.clone()),
sharding_key: None,
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
}
.validate()
.unwrap();
let err = CreateTableParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1, field2.clone()],
primary_key: vec![field2.name],
distribution: DistributionParam::Global,
by_field: None,
sharding_key: None,
sharding_fn: None,
engine: Some(SpaceEngineType::Vinyl),
timeout: None,
owner: ADMIN_ID,
}
.validate()
.unwrap_err();
assert_eq!(err.to_string(), "global spaces only support memtx engine");
}
#[::tarantool::test]
fn test_space_id_temporary_min() {
let lua = tarantool::lua_state();
let id: SpaceId = lua
.eval("return box.schema.SPACE_ID_TEMPORARY_MIN")
.unwrap();
assert_eq!(id, SPACE_ID_TEMPORARY_MIN);
}
}
#[cfg(test)]
mod test {
use super::*;
use tarantool::tuple::ToTupleBuffer;
#[test]
#[rustfmt::skip]
fn space_def_matches_format() {
let i = TableDef::for_tests();
let tuple_data = i.to_tuple_buffer().unwrap();
let format = TableDef::format();
crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "TableDef::format");
assert_eq!(format[TableDef::FIELD_OPERABLE].name, "operable");
}
#[test]
#[rustfmt::skip]
fn index_def_matches_format() {
let i = IndexDef::for_tests();
let tuple_data = i.to_tuple_buffer().unwrap();
let format = IndexDef::format();
crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "IndexDef::format");
assert_eq!(format[IndexDef::FIELD_OPERABLE].name, "operable");
}
#[test]
#[rustfmt::skip]
fn user_def_matches_format() {
let i = UserDef::for_tests();
let tuple_data = i.to_tuple_buffer().unwrap();
let format = UserDef::format();
crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "UserDef::format");
assert_eq!(format[UserDef::FIELD_AUTH].name, "auth");
}
#[test]
#[rustfmt::skip]
fn role_def_matches_format() {
let i = RoleDef::for_tests();
let tuple_data = i.to_tuple_buffer().unwrap();
let format = RoleDef::format();
crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "RoleDef::format");
}
#[test]
#[rustfmt::skip]
fn privilege_def_matches_format() {
let i = PrivilegeDef::for_tests();
let tuple_data = i.to_tuple_buffer().unwrap();
let format = PrivilegeDef::format();
crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "PrivilegeDef::format");
}
#[track_caller]
fn check_object_privilege(
object_type: SchemaObjectType,
valid_privileges: &'static [PrivilegeType],
object_id: i64,
) {
for privilege in PrivilegeType::VARIANTS {
let res = PrivilegeDef::new(*privilege, object_type, object_id, 1, 1, 1);
if valid_privileges.contains(privilege) {
assert!(res.is_ok())
} else {
assert_eq!(
res.unwrap_err(),
InvalidPrivilegeError {
object_type,
unsupported: *privilege,
expected_one_of: valid_privileges,
}
)
}
}
}
#[test]
fn privilege_def_validation() {
use PrivilegeType::*;
// table
let valid = &[Read, Write, Create, Alter, Drop];
check_object_privilege(SchemaObjectType::Table, valid, -1);
// particular table
let valid = &[Read, Write, Alter, Drop];
check_object_privilege(SchemaObjectType::Table, valid, 42);
// user
let valid = &[Create, Alter, Drop];
check_object_privilege(SchemaObjectType::User, valid, -1);
// particular user
let valid = &[Alter, Drop];
check_object_privilege(SchemaObjectType::User, valid, 42);
// role
let valid = &[Create, Drop];
check_object_privilege(SchemaObjectType::Role, valid, -1);
// particular role
let valid = &[Execute, Drop];
check_object_privilege(SchemaObjectType::Role, valid, 42);
// universe
let valid = &[Login];
check_object_privilege(SchemaObjectType::Universe, valid, 0);
}
#[test]
fn routine_def_matches_format() {
let i = RoutineDef::for_tests();
let tuple_data = i.to_tuple_buffer().unwrap();
let format = RoutineDef::format();
crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "RoutineDef::format");
}
}