-
Georgy Moshkin authoredGeorgy Moshkin authored
schema.rs 100.11 KiB
use ahash::AHashSet;
use sbroad::ir::ddl::{Language, ParamDef};
use std::borrow::Cow;
use std::collections::{BTreeMap, HashSet};
use std::fmt::Display;
use std::str::FromStr;
use std::time::Duration;
use tarantool::decimal::Decimal;
use tarantool::index::{FieldType as IndexFieldType, IndexType, Part, RtreeIndexDistanceType};
use tarantool::auth::AuthData;
use tarantool::auth::AuthDef;
use tarantool::auth::AuthMethod;
use tarantool::error::TarantoolError;
use tarantool::error::TarantoolErrorCode;
use tarantool::fiber;
use tarantool::msgpack;
use tarantool::session::{with_su, UserId};
use tarantool::set_error;
use tarantool::space::{FieldType, SpaceCreateOptions, SpaceEngineType};
use tarantool::space::{Metadata as SpaceMetadata, Space, SpaceType, SystemSpace};
use tarantool::transaction::{transaction, TransactionError};
use tarantool::{
index::IndexId,
index::IteratorType,
index::Metadata as IndexMetadata,
space::SpaceId,
tlua::{self, LuaRead},
tuple::Encode,
util::{NumOrStr, Value},
};
use sbroad::ir::value::Value as IrValue;
use serde::{Deserialize, Serialize};
use crate::access_control::UserMetadataKind;
use crate::cas::{self, compare_and_swap, Request};
use crate::config::DEFAULT_USERNAME;
use crate::instance::InstanceId;
use crate::pico_service::pico_service_password;
use crate::plugin::PluginIdentifier;
use crate::storage::{self, RoutineId, ToEntryIter};
use crate::storage::{Clusterwide, SPACE_ID_INTERNAL_MAX};
use crate::storage::{ClusterwideTable, PropertyName};
use crate::tier::DEFAULT_TIER;
use crate::traft::error::Error;
use crate::traft::op::{Ddl, Op};
use crate::traft::{self, node, RaftIndex};
use crate::util::effective_user_id;
/// The initial local schema version. Immediately after the cluster is bootted
/// it has this schema version.
///
/// If a schema definition is marked with this version, it means the schema
/// defintion is builtin and is applied by default on all instances.
pub const INITIAL_SCHEMA_VERSION: u64 = 0;
////////////////////////////////////////////////////////////////////////////////
// TableDef
////////////////////////////////////////////////////////////////////////////////
/// Database table definition.
///
/// Describes a user-defined table.
#[derive(Clone, Debug, msgpack::Encode, msgpack::Decode, PartialEq, Eq)]
pub struct TableDef {
pub id: SpaceId,
pub name: String,
pub distribution: Distribution,
pub format: Vec<tarantool::space::Field>,
pub schema_version: u64,
pub operable: bool,
pub engine: SpaceEngineType,
pub owner: UserId,
pub description: String,
}
impl TableDef {
/// Index (0-based) of field "operable" in the _pico_table table format.
pub const FIELD_OPERABLE: usize = 5;
/// Format of the _pico_table global table.
#[inline(always)]
pub fn format() -> Vec<tarantool::space::Field> {
use tarantool::space::Field;
vec![
Field::from(("id", FieldType::Unsigned)).is_nullable(false),
Field::from(("name", FieldType::String)).is_nullable(false),
Field::from(("distribution", FieldType::Map)).is_nullable(false),
Field::from(("format", FieldType::Array)).is_nullable(false),
Field::from(("schema_version", FieldType::Unsigned)).is_nullable(false),
Field::from(("operable", FieldType::Boolean)).is_nullable(false),
Field::from(("engine", FieldType::String)).is_nullable(false),
Field::from(("owner", FieldType::Unsigned)).is_nullable(false),
Field::from(("description", FieldType::String)).is_nullable(false),
]
}
/// A dummy instance of the type for use in tests.
#[inline(always)]
pub fn for_tests() -> Self {
Self {
id: 10569,
name: "stuff".into(),
distribution: Distribution::Global,
format: vec![tarantool::space::Field {
name: "field_1".into(),
field_type: tarantool::space::FieldType::Unsigned,
is_nullable: false,
}],
schema_version: 420,
operable: true,
engine: SpaceEngineType::Blackhole,
owner: 42,
description: "A table for tests".into(),
}
}
pub fn to_space_metadata(&self) -> traft::Result<SpaceMetadata> {
let format = fields_to_format(&self.format);
let mut flags = BTreeMap::new();
if matches!(self.distribution, Distribution::Global) {
flags.insert("group_id".into(), 1.into());
}
let space_def = SpaceMetadata {
id: self.id,
user_id: self.owner,
name: self.name.as_str().into(),
engine: self.engine,
field_count: 0,
flags,
format,
};
Ok(space_def)
}
}
/// Definitions of builtin tables & their respective indexes.
/// These should be inserted into "_pico_table" & "_pico_index" at cluster bootstrap.
pub fn system_table_definitions() -> Vec<(TableDef, Vec<IndexDef>)> {
let mut result = Vec::with_capacity(ClusterwideTable::all_tables().len());
for sys_table in ClusterwideTable::all_tables() {
let table_def = TableDef {
id: sys_table.id(),
name: sys_table.name().into(),
distribution: Distribution::Global,
format: sys_table.format(),
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
operable: true,
engine: SpaceEngineType::Memtx,
owner: ADMIN_ID,
description: sys_table.description(),
};
let index_defs = sys_table.index_definitions();
result.push((table_def, index_defs));
}
// TODO: there's also "_raft_log" & "_raft_state" spaces, but we don't treat
// them the same as others for some reason?
result
}
// FIXME: move this to tarantool-module
pub fn fields_to_format(
fields: &[tarantool::space::Field],
) -> Vec<BTreeMap<Cow<'static, str>, Value<'_>>> {
let mut result = Vec::with_capacity(fields.len());
for field in fields {
let mut field_map = BTreeMap::new();
field_map.insert("name".into(), Value::Str(field.name.as_str().into()));
field_map.insert("type".into(), Value::Str(field.field_type.as_str().into()));
field_map.insert("is_nullable".into(), Value::Bool(field.is_nullable));
result.push(field_map);
}
result
}
////////////////////////////////////////////////////////////////////////////////
// Distribution
////////////////////////////////////////////////////////////////////////////////
/// Defines how to distribute tuples in a table across replicasets.
#[derive(
Clone, Debug, Serialize, Deserialize, PartialEq, Eq, LuaRead, msgpack::Encode, msgpack::Decode,
)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "kind")]
pub enum Distribution {
/// Tuples will be replicated to each instance.
Global,
/// Tuples will be implicitely sharded. E.g. sent to the corresponding bucket
/// which will be determined by a hash of the provided `sharding_key`.
ShardedImplicitly {
sharding_key: Vec<String>,
#[serde(default)]
sharding_fn: ShardingFn,
tier: String,
},
/// Tuples will be explicitely sharded. E.g. sent to the bucket
/// which id is provided by field that is specified here.
///
/// Default field name: "bucket_id"
ShardedByField {
#[serde(default = "default_bucket_id_field")]
field: String,
tier: String,
},
}
fn default_bucket_id_field() -> String {
"bucket_id".into()
}
::tarantool::define_str_enum! {
/// Custom sharding functions are not yet supported.
#[derive(Default)]
pub enum ShardingFn {
Crc32 = "crc32",
#[default]
Murmur3 = "murmur3",
Xxhash = "xxhash",
Md5 = "md5",
}
}
////////////////////////////////////////////////////////////////////////////////
// IndexDef
////////////////////////////////////////////////////////////////////////////////
/// Picodata index options.
///
/// While tarantool module already includes an `IndexOptions` structure
/// for local indexes, the current enum is designed for picodata ones.
/// Currently, these options coincide, but we intend to introduce a
/// REDISTRIBUTION option for secondary indexes. It will be implemented
/// using materialized views instead of tarantool indexes. Therefore,
/// it's important to maintain a distinction between these features,
/// emphasizing that it is specific to picodata.
#[derive(Clone, Debug, Deserialize, Serialize, Hash, PartialEq, Eq)]
pub enum IndexOption {
/// Vinyl only. The false positive rate for the bloom filter.
#[serde(rename = "bloom_fpr")]
BloomFalsePositiveRate(Decimal),
/// The RTREE index dimension.
#[serde(rename = "dimension")]
Dimension(u32),
/// The RTREE index distance type.
#[serde(rename = "distance")]
Distance(RtreeIndexDistanceType),
/// Specify whether hint optimization is enabled for the TREE index.
/// If true, the index works faster, if false, the index size is reduced by half.
#[serde(rename = "hint")]
// FIXME: this option is disabled in the current version of the module.
Hint(bool),
/// Vinyl only. The page size in bytes used for read and write disk operations.
#[serde(rename = "page_size")]
PageSize(u32),
/// Vinyl only. The range size in bytes used for vinyl index.
#[serde(rename = "range_size")]
RangeSize(u32),
/// Vinyl only. The number of runs per level in the LSM tree.
#[serde(rename = "run_count_per_level")]
RunCountPerLevel(u32),
/// Vinyl only. The ratio between the size of different levels in the LSM tree.
#[serde(rename = "run_size_ratio")]
RunSizeRatio(Decimal),
/// Specify whether the index is unique. When true, the index cannot contain duplicate values.
#[serde(rename = "unique")]
Unique(bool),
}
impl IndexOption {
pub fn as_kv(&self) -> (Cow<'static, str>, Value<'_>) {
let dec_to_f64 = |d: Decimal| f64::from_str(&d.to_string()).expect("decimal to f64");
match self {
IndexOption::BloomFalsePositiveRate(rate) => {
("bloom_fpr".into(), Value::Double(dec_to_f64(*rate)))
}
IndexOption::Dimension(dim) => ("dimension".into(), Value::Num(*dim)),
IndexOption::Distance(dist) => ("distance".into(), Value::Str(dist.as_str().into())),
IndexOption::Hint(hint) => ("hint".into(), Value::Bool(*hint)),
IndexOption::PageSize(size) => ("page_size".into(), Value::Num(*size)),
IndexOption::RangeSize(size) => ("range_size".into(), Value::Num(*size)),
IndexOption::RunCountPerLevel(count) => {
("run_count_per_level".into(), Value::Num(*count))
}
IndexOption::RunSizeRatio(ratio) => {
("run_size_ratio".into(), Value::Double(dec_to_f64(*ratio)))
}
IndexOption::Unique(unique) => ("unique".into(), Value::Bool(*unique)),
}
}
pub fn is_vinyl(&self) -> bool {
matches!(
self,
IndexOption::BloomFalsePositiveRate(_)
| IndexOption::PageSize(_)
| IndexOption::RangeSize(_)
| IndexOption::RunCountPerLevel(_)
| IndexOption::RunSizeRatio(_)
)
}
pub fn is_rtree(&self) -> bool {
matches!(self, IndexOption::Dimension(_) | IndexOption::Distance(_))
}
pub fn is_tree(&self) -> bool {
matches!(self, IndexOption::Hint(_))
}
pub fn type_name(&self) -> &str {
match self {
IndexOption::BloomFalsePositiveRate(_) => "bloom_fpr",
IndexOption::Dimension(_) => "dimension",
IndexOption::Distance(_) => "distance",
IndexOption::Hint(_) => "hint",
IndexOption::PageSize(_) => "page_size",
IndexOption::RangeSize(_) => "range_size",
IndexOption::RunCountPerLevel(_) => "run_count_per_level",
IndexOption::RunSizeRatio(_) => "run_size_ratio",
IndexOption::Unique(_) => "unique",
}
}
}
/// Database index definition.
///
/// Describes a user-defined index.
#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub struct IndexDef {
pub table_id: SpaceId,
pub id: IndexId,
pub name: String,
#[serde(rename = "type")]
pub ty: IndexType,
pub opts: Vec<IndexOption>,
pub parts: Vec<Part>,
pub operable: bool,
pub schema_version: u64,
}
impl Encode for IndexDef {}
impl IndexDef {
/// Index (0-based) of field "operable" in _pico_index table format.
pub const FIELD_OPERABLE: usize = 6;
/// Format of the _pico_index global table.
#[inline(always)]
pub fn format() -> Vec<tarantool::space::Field> {
use tarantool::space::Field;
vec![
Field::from(("table_id", FieldType::Unsigned)).is_nullable(false),
Field::from(("id", FieldType::Unsigned)).is_nullable(false),
Field::from(("name", FieldType::String)).is_nullable(false),
Field::from(("type", FieldType::String)).is_nullable(false),
Field::from(("opts", FieldType::Array)).is_nullable(false),
Field::from(("parts", FieldType::Array)).is_nullable(false),
Field::from(("operable", FieldType::Boolean)).is_nullable(false),
Field::from(("schema_version", FieldType::Unsigned)).is_nullable(false),
]
}
/// A dummy instance of the type for use in tests.
#[inline(always)]
pub fn for_tests() -> Self {
Self {
table_id: 10569,
id: 1,
name: "secondary".into(),
ty: IndexType::Tree,
opts: vec![],
parts: vec![],
operable: true,
schema_version: 420,
}
}
pub fn to_index_metadata(&self, table_def: &TableDef) -> IndexMetadata {
let mut opts = BTreeMap::new();
for opt in &self.opts {
let (key, value) = opt.as_kv();
opts.insert(key, value);
}
// We must convert any field names to field indexes in the index parts,
// because it is very important for tarantool, and we are very
// understanding and supportive of it.
let mut parts = self.parts.clone();
for part in &mut parts {
let NumOrStr::Str(field_name) = &part.field else {
continue;
};
let mut index = 0;
for field in &table_def.format {
if &field.name == field_name {
break;
}
index += 1;
}
// No need to check the field was found,
// tarantool will tell us if the field index is out of range
part.field = NumOrStr::Num(index as _);
}
let index_meta = IndexMetadata {
space_id: self.table_id,
index_id: self.id,
name: self.name.as_str().into(),
r#type: self.ty,
opts,
parts,
};
index_meta
}
pub fn opts_equal(&self, opts: &[IndexOption]) -> bool {
let mut set: AHashSet<&IndexOption> = AHashSet::with_capacity(opts.len());
for opt in &self.opts {
set.insert(opt);
}
for opt in opts {
if !set.contains(opt) {
return false;
}
}
true
}
}
////////////////////////////////////////////////////////////////////////////////
// PluginDef
////////////////////////////////////////////////////////////////////////////////
/// Plugin defenition in _pico_plugin system table.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct PluginDef {
/// Plugin name.
pub name: String,
/// Indicates plugin enabled or not.
pub enabled: bool,
/// List of plugin services.
pub services: Vec<String>,
/// Plugin version.
// TODO currently this version is unused,
// expect service loading algorithm (when version from .so file
// should be equal with plugin version from manifest).
// This would be change in future, with API breaking changes and plugin rolling update feature.
pub version: String,
/// Plugin description
pub description: String,
/// List of migration files.
pub migration_list: Vec<String>,
}
impl Encode for PluginDef {}
impl PluginDef {
/// Index (0-based) of field "enable" in the _pico_plugin table format.
pub const FIELD_ENABLE: usize = 1;
/// Format of the _pico_plugin global table.
#[inline(always)]
pub fn format() -> Vec<tarantool::space::Field> {
use tarantool::space::Field;
vec![
Field::from(("name", FieldType::String)).is_nullable(false),
Field::from(("enabled", FieldType::Boolean)).is_nullable(false),
Field::from(("services", FieldType::Array)).is_nullable(false),
Field::from(("version", FieldType::String)).is_nullable(false),
Field::from(("description", FieldType::String)).is_nullable(false),
Field::from(("migration_list", FieldType::Array)).is_nullable(false),
]
}
#[cfg(test)]
pub fn for_tests() -> Self {
Self {
name: "plugin".into(),
enabled: false,
services: vec!["service_1".to_string(), "service_2".to_string()],
version: "0.0.1".into(),
description: "description".to_string(),
migration_list: vec![],
}
}
#[inline]
pub fn identifier(&self) -> PluginIdentifier {
PluginIdentifier {
name: self.name.clone(),
version: self.version.clone(),
}
}
#[inline]
pub fn into_identifier(self) -> PluginIdentifier {
PluginIdentifier {
name: self.name,
version: self.version,
}
}
}
////////////////////////////////////////////////////////////////////////////////
// ServiceDef
////////////////////////////////////////////////////////////////////////////////
/// Plugin service definition in _pico_service system table.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct ServiceDef {
/// Plugin name.
pub plugin_name: String,
/// Service name.
pub name: String,
/// Service version must be the same as a plugin version.
pub version: String,
/// List of tiers where service must be running.
// FIXME: for future improvements
pub tiers: Vec<String>,
/// Plugin description
pub description: String,
}
impl Encode for ServiceDef {}
impl ServiceDef {
/// Format of the _pico_service global table.
#[inline(always)]
pub fn format() -> Vec<tarantool::space::Field> {
use tarantool::space::Field;
vec![
Field::from(("plugin_name", FieldType::String)).is_nullable(false),
Field::from(("name", FieldType::String)).is_nullable(false),
Field::from(("version", FieldType::String)).is_nullable(false),
Field::from(("tiers", FieldType::Array)).is_nullable(false),
Field::from(("description", FieldType::String)).is_nullable(false),
]
}
#[cfg(test)]
pub fn for_tests() -> Self {
Self {
plugin_name: "plugin".to_string(),
name: "service".into(),
version: "0.0.1".into(),
tiers: vec!["t1".to_string(), "t2".to_string()],
description: "description".to_string(),
}
}
}
////////////////////////////////////////////////////////////////////////////////
// ServiceRouteItem
////////////////////////////////////////////////////////////////////////////////
/// Single route definition in _pico_service_route system table.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct ServiceRouteItem {
/// Instance id.
pub instance_id: InstanceId,
/// Plugin name.
pub plugin_name: String,
/// Plugin version.
pub plugin_version: String,
/// Service name.
pub service_name: String,
/// `true` if route is poisoned, `false` otherwise.
pub poison: bool,
}
impl Encode for ServiceRouteItem {}
impl ServiceRouteItem {
/// Index of field "poison" in the table _pico_service_route format.
///
/// Index of first field is 0.
pub const FIELD_POISON: u32 = 4;
pub fn new_healthy(
instance_id: InstanceId,
plugin_ident: &PluginIdentifier,
service_name: impl ToString,
) -> Self {
Self {
instance_id,
plugin_name: plugin_ident.name.clone(),
plugin_version: plugin_ident.version.to_string(),
service_name: service_name.to_string(),
poison: false,
}
}
pub fn new_poison(
instance_id: InstanceId,
plugin_ident: &PluginIdentifier,
service_name: impl ToString,
) -> Self {
Self {
instance_id,
plugin_name: plugin_ident.name.clone(),
plugin_version: plugin_ident.version.to_string(),
service_name: service_name.to_string(),
poison: true,
}
}
/// Format of the _pico_service_route global table.
#[inline(always)]
pub fn format() -> Vec<tarantool::space::Field> {
use tarantool::space::Field;
vec![
Field::from(("instance_id", FieldType::String)).is_nullable(false),
Field::from(("plugin_name", FieldType::String)).is_nullable(false),
Field::from(("plugin_version", FieldType::String)).is_nullable(false),
Field::from(("service_name", FieldType::String)).is_nullable(false),
Field::from(("poison", FieldType::Boolean)).is_nullable(false),
]
}
#[cfg(test)]
pub fn for_tests() -> Self {
Self {
instance_id: InstanceId("i1".to_string()),
plugin_name: "plugin".to_string(),
plugin_version: "version".to_string(),
service_name: "service".to_string(),
poison: false,
}
}
pub fn key(&self) -> ServiceRouteKey {
ServiceRouteKey {
instance_id: &self.instance_id,
plugin_name: &self.plugin_name,
plugin_version: &self.plugin_version,
service_name: &self.service_name,
}
}
}
#[derive(Clone, Debug, Serialize, PartialEq)]
pub struct ServiceRouteKey<'a> {
/// Instance id.
pub instance_id: &'a InstanceId,
/// Plugin name.
pub plugin_name: &'a str,
/// Plugin version.
pub plugin_version: &'a str,
/// Service name.
pub service_name: &'a str,
}
impl<'a> Encode for ServiceRouteKey<'a> {}
/// Single record in _pico_plugin_migration system table.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct PluginMigrationRecord {
/// Plugin name.
pub plugin_name: String,
/// Migration file path.
pub migration_file: String,
/// MD5 of a migration file content, represented by a hex string.
hash: String,
}
impl Encode for PluginMigrationRecord {}
impl PluginMigrationRecord {
/// Format of the _pico_plugin_migration global table.
#[inline(always)]
pub fn format() -> Vec<tarantool::space::Field> {
use tarantool::space::Field;
vec![
Field::from(("plugin_name", FieldType::String)).is_nullable(false),
Field::from(("migration_file", FieldType::String)).is_nullable(false),
Field::from(("hash", FieldType::String)).is_nullable(false),
]
}
#[inline(always)]
pub fn hash(&self) -> &str {
&self.hash
}
#[cfg(test)]
pub fn for_tests() -> Self {
Self {
plugin_name: "plugin".to_string(),
migration_file: "migration_1.db".to_string(),
hash: format!("{:x}", md5::compute("test")),
}
}
}
/// Single record in _pico_plugin_config system table.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct PluginConfigRecord {
/// Plugin name.
pub plugin: String,
/// Plugin version.
pub version: String,
/// Plugin service or extension name.
pub entity: String,
/// Configuration key.
pub key: String,
/// Configration value.
pub value: rmpv::Value,
}
impl Encode for PluginConfigRecord {}
impl PluginConfigRecord {
#[inline(always)]
pub fn format() -> Vec<tarantool::space::Field> {
use tarantool::space::Field;
vec![
Field::from(("plugin", FieldType::String)).is_nullable(false),
Field::from(("version", FieldType::String)).is_nullable(false),
Field::from(("entity", FieldType::String)).is_nullable(false),
Field::from(("key", FieldType::String)).is_nullable(false),
Field::from(("value", FieldType::Any)).is_nullable(true),
]
}
/// Create records from the whole configuration.
pub fn from_config(
ident: &PluginIdentifier,
entity: &str,
config: rmpv::Value,
) -> tarantool::Result<Vec<Self>> {
let mut result = vec![];
let map = match config {
rmpv::Value::Nil => return Ok(result),
rmpv::Value::Map(map) => map,
_ => {
let err = format!(
"Plugin configuration should be represented as messagepack map. Got: {config:?}"
);
return Err(tarantool::error::Error::DecodeRmpValue(
serde::de::Error::custom(err),
));
}
};
for (k, v) in map {
let key = k
.as_str()
.ok_or(<rmp_serde::decode::Error as serde::de::Error>::custom(
"Only string keys allowed in plugin configuration",
))?;
result.push(Self {
plugin: ident.name.to_string(),
version: ident.version.to_string(),
entity: entity.to_string(),
key: key.to_string(),
value: v,
});
}
Ok(result)
}
#[inline]
pub fn pk(&self) -> [&str; 4] {
[&self.plugin, &self.version, &self.entity, &self.key]
}
#[cfg(test)]
pub fn for_tests() -> Self {
Self {
plugin: "plugin".to_string(),
version: "0.1.0".to_string(),
entity: "service_1".to_string(),
key: "key_1".to_string(),
value: rmpv::Value::Boolean(true),
}
}
}
////////////////////////////////////////////////////////////////////////////////
// UserDef
////////////////////////////////////////////////////////////////////////////////
/// User definition.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct UserDef {
pub id: UserId,
pub name: String,
pub schema_version: u64,
pub auth: Option<AuthDef>,
pub owner: UserId,
#[serde(rename = "type")]
pub ty: UserMetadataKind,
}
impl Encode for UserDef {}
impl UserDef {
/// Index of field "auth" in the space _pico_user format.
///
/// Index of first field is 0.
pub const FIELD_AUTH: usize = 3;
/// Index of field "name" in the space _pico_user format.
pub const FIELD_NAME: usize = 1;
#[inline(always)]
pub fn is_role(&self) -> bool {
matches!(self.ty, UserMetadataKind::Role)
}
/// Format of the _pico_user global table.
#[inline(always)]
pub fn format() -> Vec<tarantool::space::Field> {
use tarantool::space::Field;
vec![
Field::from(("id", FieldType::Unsigned)).is_nullable(false),
Field::from(("name", FieldType::String)).is_nullable(false),
Field::from(("schema_version", FieldType::Unsigned)).is_nullable(false),
Field::from(("auth", FieldType::Array)).is_nullable(true),
Field::from(("owner", FieldType::Unsigned)).is_nullable(false),
Field::from(("type", FieldType::String)).is_nullable(false),
]
}
/// A dummy instance of the type for use in tests.
#[inline(always)]
pub fn for_tests() -> Self {
Self {
id: 69,
name: "david".into(),
schema_version: 421,
auth: Some(AuthDef::new(
tarantool::auth::AuthMethod::ChapSha1,
"".into(),
)),
owner: 42,
ty: UserMetadataKind::User,
}
}
}
////////////////////////////////////////////////////////////////////////////////
// PrivilegeDef
////////////////////////////////////////////////////////////////////////////////
/// User id of the builtin user "guest".
///
/// Default "untrusted" user.
///
/// See also <https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/_user/#box-space-user>
pub const GUEST_ID: UserId = 0;
/// User id of the builtin user "admin".
///
/// Note: Admin user id is used when we need to elevate privileges
/// because current user doesnt have access to system spaces
///
/// See also <https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/_user/#box-space-user>
pub const ADMIN_ID: UserId = 1;
/// User id of the builtin role "public".
///
/// Pre-defined role, automatically granted to new users.
/// Granting some privilege to this role is equivalent granting the privilege to everybody.
///
/// See also <https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/_user/#box-space-user>
pub const PUBLIC_ID: UserId = 2;
/// User id of the builtin role "replication".
///
/// Role "replication" has the following grants:
/// - Read access to the "universe"
/// - Write access to the space "_cluster"
pub const ROLE_REPLICATION_ID: i64 = 3;
/// User id of the builtin role "super".
///
/// Users with this role have access to everything.
///
/// See also <https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/_user/#box-space-user>
pub const SUPER_ID: UserId = 31;
/// User id of the builtin user "pico_service".
///
/// A special user for internal communication between instances of picodata.
/// It is equivalent in it's privileges to "admin". The only difference is that
/// only the automated rpc calls are performed as "pico_service".
pub const PICO_SERVICE_ID: UserId = 32;
/// Object id of the special builtin object "universe".
///
/// Object "universe" is basically an alias to the "whole database".
/// Granting access to "universe" is equivalent to granting access
/// to all objects of all types for which the privilege makes sense.
pub const UNIVERSE_ID: i64 = 0;
tarantool::define_str_enum! {
pub enum SchemaObjectType {
Table = "table",
Role = "role",
Routine = "routine",
User = "user",
Universe = "universe",
}
}
impl SchemaObjectType {
pub fn as_tarantool(&self) -> &'static str {
match self {
SchemaObjectType::Table => "space",
SchemaObjectType::Routine => "function",
t => t.as_str(),
}
}
}
tarantool::define_str_enum! {
/// Picodata privilege types. For correspondence with
/// tarantool privilege types see [`PrivilegeType::as_tarantool`].
///
/// For each variant it is described which SQL queries in picodata
/// a user with this privilege can execute.
pub enum PrivilegeType {
/// Allows SQL queries: `SELECT`
Read = "read",
/// Allows SQL queries: `INSERT`, `UPDATE`, `DELETE`
Write = "write",
/// Allows SQL queries: `CALL`.
/// Also can indicate that role is granted to a user
/// if object type is role.
Execute = "execute",
/// Allows a user to log-in when connecting to picodata instance
Login = "login",
/// Allows SQL queries: `CREATE`
Create = "create",
/// Allows SQL queries: `DROP`
Drop = "drop",
/// Allows SQL queries: `ALTER`
Alter = "alter",
}
}
impl PrivilegeType {
/// Converts picodata privilege to a string that can be passed to
/// `box.schema.user.grant` as `privileges`
pub fn as_tarantool(&self) -> &'static str {
match self {
PrivilegeType::Login => "usage",
t => t.as_str(),
}
}
}
/// Privilege definition.
/// Note the differences between picodata privileges and vanilla tarantool ones.
/// 1) Super role in picodata is a placeholder. It exists because picodata reuses user
/// ids from vanilla and super occupies an id. So to avoid assigning some piciodata
/// user Id of the vanilla role super we keep it as a placeholder. Aside from that
/// vanilla super role has some privileges that cannot be represented in picodata,
/// namely privileges on universe. This hole opens possibility for unwanted edge cases.
/// 2) Only Login can be granted on Universe in picodata.
/// Note that validation is not performed in Deserialize. We assume that for untrusted data
/// the object is created via constructor. In other cases validation was already performed
/// prior to serialization thus deserialization always creates a valid object.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct PrivilegeDef {
privilege: PrivilegeType,
object_type: SchemaObjectType,
/// `-1` denotes an absense of a target object.
/// Other values should be >= 0 and denote an existing target object.
/// When working with `object_type` `universe` it might seem that it does
/// not have a target object and `object_id` should be `-1`, this is incorrect
/// universe has a target object with `object_id == 0`.
///
/// To get the value of this field as `Option<u32>` see [`Self::object_id`]
object_id: i64,
/// Id of the user or role to whom the privilege is granted.
///
/// In tarantool users and roles are stored in the same space, which means a
/// role and a user cannot have the same id or name.
grantee_id: UserId,
grantor_id: UserId,
schema_version: u64,
}
impl Encode for PrivilegeDef {}
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub struct InvalidPrivilegeError {
object_type: SchemaObjectType,
unsupported: PrivilegeType,
expected_one_of: &'static [PrivilegeType],
}
impl Display for InvalidPrivilegeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"Unsupported {} privilege '{}', expected one of {:?}",
self.object_type, self.unsupported, self.expected_one_of,
))
}
}
impl PrivilegeDef {
pub fn new(
privilege: PrivilegeType,
object_type: SchemaObjectType,
object_id: i64,
grantee_id: UserId,
grantor_id: UserId,
schema_version: u64,
) -> Result<PrivilegeDef, InvalidPrivilegeError> {
let privilege_def = PrivilegeDef {
privilege,
object_type,
object_id,
grantee_id,
grantor_id,
schema_version,
};
use PrivilegeType::*;
let valid_privileges: &[PrivilegeType] = match privilege_def.object_type {
SchemaObjectType::Table => match privilege_def.object_id() {
Some(_) => &[Read, Write, Alter, Drop],
None => &[Read, Write, Create, Alter, Drop],
},
SchemaObjectType::Role => match privilege_def.object_id() {
Some(_) => &[Execute, Drop],
None => &[Create, Drop],
},
SchemaObjectType::User => match privilege_def.object_id() {
Some(_) => &[Alter, Drop],
None => &[Create, Alter, Drop],
},
SchemaObjectType::Universe => &[Login],
SchemaObjectType::Routine => match privilege_def.object_id() {
Some(_) => &[Execute, Drop],
None => &[Create, Drop, Execute],
},
};
if !valid_privileges.contains(&privilege) {
return Err(InvalidPrivilegeError {
object_type,
unsupported: privilege,
expected_one_of: valid_privileges,
});
}
Ok(privilege_def)
}
pub fn login(grantee_id: UserId, grantor_id: UserId, schema_version: u64) -> Self {
Self::new(
PrivilegeType::Login,
SchemaObjectType::Universe,
0,
grantee_id,
grantor_id,
schema_version,
)
.expect("cant fail, valid login privilege")
}
#[inline(always)]
pub fn privilege(&self) -> PrivilegeType {
self.privilege
}
#[inline(always)]
pub fn object_type(&self) -> SchemaObjectType {
self.object_type
}
/// Get `object_id` field interpreting `-1` as `None`.
#[inline(always)]
pub fn object_id(&self) -> Option<u32> {
if self.object_id >= 0 {
Some(self.object_id as _)
} else {
debug_assert_eq!(self.object_id, -1, "object_id should be >= -1");
None
}
}
#[inline(always)]
pub fn object_id_raw(&self) -> i64 {
self.object_id
}
#[inline(always)]
pub fn grantee_id(&self) -> UserId {
self.grantee_id
}
#[inline(always)]
pub fn grantor_id(&self) -> UserId {
self.grantor_id
}
#[inline(always)]
pub fn schema_version(&self) -> u64 {
self.schema_version
}
/// Format of the _pico_privilege global table.
#[inline(always)]
pub fn format() -> Vec<tarantool::space::Field> {
use tarantool::space::Field;
vec![
Field::from(("privilege", FieldType::String)).is_nullable(false),
Field::from(("object_type", FieldType::String)).is_nullable(false),
Field::from(("object_id", FieldType::Integer)).is_nullable(false),
Field::from(("grantee_id", FieldType::Unsigned)).is_nullable(false),
Field::from(("grantor_id", FieldType::Unsigned)).is_nullable(false),
Field::from(("schema_version", FieldType::Unsigned)).is_nullable(false),
]
}
/// A dummy instance of the type for use in tests.
#[inline(always)]
pub fn for_tests() -> Self {
Self {
grantor_id: 13,
grantee_id: 37,
object_type: SchemaObjectType::User,
object_id: -1,
privilege: PrivilegeType::Create,
schema_version: 337,
}
}
/// Retrieves object_name from system spaces based on `object_id` and `object_type`.
/// Returns `Ok(None)` in the case when the privilege has no target object (e.g. `object_id == -1`)
/// or when target object is universe.
/// Returns `Err` in case when target object was not found in system tables.
///
/// # Panics
/// 1. On storage failure
pub fn resolve_object_name(&self, storage: &Clusterwide) -> Result<Option<String>, Error> {
let Some(id) = self.object_id() else {
return Ok(None);
};
let name = match self.object_type {
SchemaObjectType::Table => storage.tables.get(id).map(|t| t.map(|t| t.name)),
SchemaObjectType::Role | SchemaObjectType::User => {
storage.users.by_id(id).map(|t| t.map(|t| t.name))
}
SchemaObjectType::Universe => {
debug_assert_eq!(self.object_id, 0);
return Ok(None);
}
SchemaObjectType::Routine => storage.routines.by_id(id).map(|t| t.map(|t| t.name)),
}
.expect("storage should not fail")
.ok_or_else(|| Error::other(format!("object with id {id} should exist")))?;
Ok(Some(name))
}
}
////////////////////////////////////////////////////////////////////////////////
// system_user_definitions
////////////////////////////////////////////////////////////////////////////////
/// Definitions of builtin users & their respective privileges.
/// These should be inserted into "_pico_user" & "_pico_privilege" at cluster bootstrap.
pub fn system_user_definitions() -> Vec<(UserDef, Vec<PrivilegeDef>)> {
let mut result = vec![];
let initiator = ADMIN_ID;
//
// User "guest"
//
// equivalent SQL expression: CREATE USER 'guest' WITH PASSWORD '' USING chap-sha1
{
let user_def = UserDef {
id: GUEST_ID,
name: DEFAULT_USERNAME.into(),
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
auth: Some(AuthDef::new(
AuthMethod::ChapSha1,
AuthData::new(&AuthMethod::ChapSha1, DEFAULT_USERNAME, "").into_string(),
)),
owner: initiator,
ty: UserMetadataKind::User,
};
let priv_defs = vec![
PrivilegeDef {
grantee_id: user_def.id,
privilege: PrivilegeType::Login,
object_type: SchemaObjectType::Universe,
object_id: UNIVERSE_ID,
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
grantor_id: initiator,
},
PrivilegeDef {
grantee_id: user_def.id,
privilege: PrivilegeType::Execute,
object_type: SchemaObjectType::Role,
object_id: PUBLIC_ID as _,
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
grantor_id: initiator,
},
];
result.push((user_def, priv_defs));
}
//
// User "admin"
//
{
let user_def = UserDef {
id: ADMIN_ID,
name: "admin".into(),
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
// This place slightly differs from the tarantool
// implementation. In vanilla tarantool the auth_def is an empty
// MP_MAP. Here for simplicity given available module api we
// use ChapSha with invalid password (its impossible to get
// empty string as output of sha1)
auth: Some(AuthDef::new(AuthMethod::ChapSha1, "".into())),
owner: initiator,
ty: UserMetadataKind::User,
};
let mut priv_defs = Vec::with_capacity(PrivilegeType::VARIANTS.len());
// Grant all privileges on "universe" to "admin".
for &privilege in PrivilegeType::VARIANTS {
priv_defs.push(PrivilegeDef {
grantee_id: user_def.id,
privilege,
object_type: SchemaObjectType::Universe,
object_id: UNIVERSE_ID,
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
grantor_id: initiator,
});
}
result.push((user_def, priv_defs));
}
//
// User "pico_service"
//
{
let user_def = UserDef {
id: PICO_SERVICE_ID,
name: PICO_SERVICE_USER_NAME.into(),
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
auth: Some(AuthDef::new(
AuthMethod::ChapSha1,
tarantool::auth::AuthData::new(
&AuthMethod::ChapSha1,
PICO_SERVICE_USER_NAME,
pico_service_password(),
)
.into_string(),
)),
owner: initiator,
ty: UserMetadataKind::User,
};
let mut priv_defs = Vec::with_capacity(PrivilegeType::VARIANTS.len() + 1);
// Grant all privileges on "universe" to "pico_service".
for &privilege in PrivilegeType::VARIANTS {
priv_defs.push(PrivilegeDef {
grantee_id: user_def.id,
privilege,
object_type: SchemaObjectType::Universe,
object_id: UNIVERSE_ID,
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
grantor_id: initiator,
});
}
// Role "replication" is needed explicitly
priv_defs.push(PrivilegeDef {
grantee_id: user_def.id,
privilege: PrivilegeType::Execute,
object_type: SchemaObjectType::Role,
object_id: ROLE_REPLICATION_ID,
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
grantor_id: initiator,
});
result.push((user_def, priv_defs));
}
result
}
/// Definitions of builtin roles & their respective privileges.
/// These should be inserted into "_pico_role" & "_pico_privilege" at cluster bootstrap.
// TODO: maybe this "_pico_role" should be merged with "_pico_user"
pub fn system_role_definitions() -> Vec<(UserDef, Vec<PrivilegeDef>)> {
let mut result = vec![];
let public_def = UserDef {
id: PUBLIC_ID,
name: "public".into(),
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
owner: ADMIN_ID,
auth: None,
ty: UserMetadataKind::Role,
};
let public_privs = vec![
// TODO:
// - we grant "execute" access to a bunch of stored procs
// - vanilla tarantool grants "read" access to a bunch of system spaces
];
result.push((public_def, public_privs));
let super_def = UserDef {
id: SUPER_ID,
name: "super".into(),
// This means the local schema is already up to date and main loop doesn't need to do anything
schema_version: INITIAL_SCHEMA_VERSION,
owner: ADMIN_ID,
auth: None,
ty: UserMetadataKind::Role,
};
let super_privs = vec![
// Special role, it's privileges are implicit
];
result.push((super_def, super_privs));
// There's also a "replication" builtin role, but we don't care about it?
result
}
////////////////////////////////////////////////////////////////////////////////
// init_pico_service
////////////////////////////////////////////////////////////////////////////////
/// Name of the special builtin user for internal communication between
/// instances. It's id is [`PICO_SERVICE_ID`].
///
/// Use this constant instead of literal "pico_service" so that it's easier to
/// find all the places where we refer to "pico_service".
pub const PICO_SERVICE_USER_NAME: &'static str = "pico_service";
pub fn init_user_pico_service() {
let sys_user = SystemSpace::User.as_space();
let sys_priv = SystemSpace::Priv.as_space();
let t = sys_user
.get(&[PICO_SERVICE_ID])
.expect("reading from _user shouldn't fail");
if t.is_some() {
// Already exists (instance restarted)
return;
}
let found = system_user_definitions()
.into_iter()
.find(|(user_def, _)| user_def.id == PICO_SERVICE_ID);
let Some((user_def, _)) = &found else {
panic!("Couldn't find definition for '{PICO_SERVICE_USER_NAME}' system user");
};
let res = storage::acl::on_master_create_user(user_def, false);
if let Err(e) = res {
panic!("failed creating user '{PICO_SERVICE_USER_NAME}': {e}");
}
// Grant ALL privileges to "every object of every type".
const PRIVILEGE_ALL: u32 = 0xffff_ffff;
let res = sys_priv.insert(&(
ADMIN_ID,
PICO_SERVICE_ID,
"universe",
UNIVERSE_ID,
PRIVILEGE_ALL,
));
if let Err(e) = res {
panic!("failed creating user '{PICO_SERVICE_USER_NAME}': {e}");
}
// Also grant role "replication", because all privileges to "every object of
// every type" is not enough.
const PRIVILEGE_EXECUTE: u32 = 4;
let res = sys_priv.insert(&(
ADMIN_ID,
PICO_SERVICE_ID,
"role",
ROLE_REPLICATION_ID,
PRIVILEGE_EXECUTE,
));
if let Err(e) = res {
panic!("failed creating user '{PICO_SERVICE_USER_NAME}': {e}");
}
}
////////////////////////////////////////////////////////////////////////////////
// RoutineDef
////////////////////////////////////////////////////////////////////////////////
/// Routine kind.
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum RoutineKind {
#[default]
Procedure,
}
impl Display for RoutineKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RoutineKind::Procedure => write!(f, "procedure"),
}
}
}
/// Parameter mode.
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum RoutineParamMode {
#[default]
In,
}
impl Display for RoutineParamMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RoutineParamMode::In => write!(f, "in"),
}
}
}
/// Routine parameter definition.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct RoutineParamDef {
#[serde(default)]
pub mode: RoutineParamMode,
pub r#type: FieldType,
#[serde(default)]
pub default: Option<IrValue>,
}
impl Display for RoutineParamDef {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "mode: {}, type: {}", self.mode, self.r#type)?;
if let Some(default) = &self.default {
write!(f, ", default: {}", default)?;
}
Ok(())
}
}
impl Default for RoutineParamDef {
fn default() -> Self {
Self {
mode: RoutineParamMode::default(),
r#type: FieldType::Scalar,
default: None,
}
}
}
impl RoutineParamDef {
pub fn with_type(self, r#type: FieldType) -> Self {
Self { r#type, ..self }
}
}
pub type RoutineParams = Vec<RoutineParamDef>;
pub type RoutineReturns = Vec<IrValue>;
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum RoutineLanguage {
#[default]
SQL,
}
impl From<Language> for RoutineLanguage {
fn from(language: Language) -> Self {
match language {
Language::SQL => Self::SQL,
}
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum RoutineSecurity {
#[default]
Invoker,
}
/// Routine definition.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct RoutineDef {
pub id: RoutineId,
pub name: String,
pub kind: RoutineKind,
pub params: RoutineParams,
pub returns: RoutineReturns,
pub language: RoutineLanguage,
pub body: String,
pub security: RoutineSecurity,
pub operable: bool,
pub schema_version: u64,
pub owner: UserId,
}
impl Encode for RoutineDef {}
impl RoutineDef {
/// Index (0-based) of field "operable" in _pico_routine table format.
pub const FIELD_OPERABLE: usize = 8;
/// Index (0-based) of field "name" in _pico_routine table format.
pub const FIELD_NAME: usize = 1;
/// Format of the _pico_routine global table.
#[inline(always)]
pub fn format() -> Vec<tarantool::space::Field> {
use tarantool::space::Field;
vec![
Field::from(("id", FieldType::Unsigned)).is_nullable(false),
Field::from(("name", FieldType::String)).is_nullable(false),
Field::from(("kind", FieldType::String)).is_nullable(false),
Field::from(("params", FieldType::Array)).is_nullable(false),
Field::from(("returns", FieldType::Array)).is_nullable(false),
Field::from(("language", FieldType::String)).is_nullable(false),
Field::from(("body", FieldType::String)).is_nullable(false),
Field::from(("security", FieldType::String)).is_nullable(false),
Field::from(("operable", FieldType::Boolean)).is_nullable(false),
Field::from(("schema_version", FieldType::Unsigned)).is_nullable(false),
Field::from(("owner", FieldType::Unsigned)).is_nullable(false),
]
}
/// A dummy instance of the type for use in tests.
#[inline(always)]
pub fn for_tests() -> Self {
Self {
id: 16005,
name: "proc".into(),
kind: RoutineKind::Procedure,
params: vec![
RoutineParamDef {
mode: RoutineParamMode::In,
r#type: FieldType::String,
default: Some(IrValue::String("hello".into())),
},
RoutineParamDef {
mode: RoutineParamMode::In,
r#type: FieldType::Unsigned,
default: None,
},
],
returns: vec![],
language: RoutineLanguage::SQL,
body: "values (?), (?)".into(),
security: RoutineSecurity::Invoker,
operable: true,
schema_version: 421,
owner: 42,
}
}
}
////////////////////////////////////////////////////////////////////////////////
// ...
////////////////////////////////////////////////////////////////////////////////
// TODO: this should be a TryFrom in tarantool-module
pub fn try_space_field_type_to_index_field_type(
ft: tarantool::space::FieldType,
) -> Option<tarantool::index::FieldType> {
use tarantool::index::FieldType as IFT;
use tarantool::space::FieldType as SFT;
let res = match ft {
SFT::Any => None,
SFT::Unsigned => Some(IFT::Unsigned),
SFT::String => Some(IFT::String),
SFT::Number => Some(IFT::Number),
SFT::Double => Some(IFT::Double),
SFT::Integer => Some(IFT::Integer),
SFT::Boolean => Some(IFT::Boolean),
SFT::Varbinary => Some(IFT::Varbinary),
SFT::Scalar => Some(IFT::Scalar),
SFT::Decimal => Some(IFT::Decimal),
SFT::Uuid => Some(IFT::Uuid),
SFT::Datetime => Some(IFT::Datetime),
SFT::Interval => None,
SFT::Array => Some(IFT::Array),
SFT::Map => None,
};
res
}
#[derive(Debug, thiserror::Error)]
pub enum DdlError {
#[error("{0}")]
CreateTable(#[from] CreateTableError),
#[error("ddl operation was aborted")]
Aborted,
#[error("there is no pending ddl operation")]
NoPendingDdl,
#[error("{0}")]
CreateRoutine(#[from] CreateRoutineError),
#[error("{0}")]
CreateIndex(#[from] CreateIndexError),
}
#[derive(Debug, thiserror::Error)]
pub enum CreateTableError {
#[error("space with id {id} exists with a different name '{actual_name}', but expected '{expected_name}'")]
ExistsWithDifferentName {
id: SpaceId,
expected_name: String,
actual_name: String,
},
#[error("several fields have the same name: {0}")]
DuplicateFieldName(String),
#[error("no field with name: {0}")]
FieldUndefined(String),
#[error("distribution is `sharded`, but neither `by_field` nor `sharding_key` is set")]
ShardingPolicyUndefined,
#[error("only one of sharding policy fields (`by_field`, `sharding_key`) should be set")]
ConflictingShardingPolicy,
#[error("global spaces only support memtx engine")]
IncompatibleGlobalSpaceEngine,
#[error("specified tier '{tier_name}' doesn't exist")]
UnexistingTier { tier_name: String },
#[error("specified tier '{tier_name}' doesn't contain at least one instance")]
EmptyTier { tier_name: String },
}
impl From<CreateTableError> for Error {
fn from(err: CreateTableError) -> Self {
DdlError::CreateTable(err).into()
}
}
#[derive(Debug, thiserror::Error)]
pub enum CreateIndexError {
#[error("index {name} already exists with a different signature")]
IndexAlreadyExists { name: String },
#[error("table {table_name} not found")]
TableNotFound { table_name: String },
#[error("table {table_name} is not operable")]
TableNotOperable { table_name: String },
#[error("no field with name: {name}")]
FieldUndefined { name: String },
#[error("table engine {engine} does not support option {option}")]
IncompatibleTableEngineOption { engine: String, option: String },
#[error("index type {ty} does not support option {option}")]
IncompatibleIndexTypeOption { ty: String, option: String },
#[error("index type {ty} does not support column type {ctype}")]
IncompatibleIndexColumnType { ty: String, ctype: String },
#[error("index type {ty} does not support nullable columns")]
IncompatipleNullableColumn { ty: String },
#[error("unique index for the sharded table must duplicate its sharding key columns")]
IncompatibleUniqueIndexColumns,
#[error("index type {ty} does not support unique indexes")]
UniqueIndexType { ty: String },
#[error("index type {ty} does not support non-unique indexes")]
NonUniqueIndexType { ty: String },
#[error("index type {ty} does not support multiple columns")]
IncompatibleIndexMultipleColumns { ty: String },
}
impl From<CreateIndexError> for Error {
fn from(err: CreateIndexError) -> Self {
DdlError::CreateIndex(err).into()
}
}
#[derive(Debug, thiserror::Error)]
pub enum CreateRoutineError {
#[error("routine {name} already exists with a different kind")]
ExistsWithDifferentKind { name: String },
#[error("routine {name} already exists with different parameters")]
ExistsWithDifferentParams { name: String },
#[error("routine {name} already exists with a different language")]
ExistsWithDifferentLanguage { name: String },
#[error("routine {name} already exists with a different body")]
ExistsWithDifferentBody { name: String },
#[error("routine {name} already exists with a different security")]
ExistsWithDifferentSecurity { name: String },
#[error("routine {name} already exists with a different owner")]
ExistsWithDifferentOwner { name: String },
}
impl From<CreateRoutineError> for Error {
fn from(err: CreateRoutineError) -> Self {
DdlError::CreateRoutine(err).into()
}
}
// TODO: Add `LuaRead` to tarantool::space::Field and use it
#[derive(Clone, Debug, LuaRead)]
pub struct Field {
pub name: String,
// TODO(gmoshkin): &str
pub r#type: FieldType,
pub is_nullable: bool,
}
impl From<Field> for tarantool::space::Field {
fn from(field: Field) -> Self {
tarantool::space::Field {
name: field.name,
field_type: field.r#type,
is_nullable: field.is_nullable,
}
}
}
::tarantool::define_str_enum! {
#[derive(Default)]
pub enum DistributionParam {
#[default]
Global = "global",
Sharded = "sharded",
}
}
#[derive(Clone, Debug)]
pub struct RenameRoutineParams {
pub new_name: String,
pub old_name: String,
pub params: Option<Vec<ParamDef>>,
}
impl RenameRoutineParams {
pub fn func_exists(&self) -> bool {
func_exists(&self.old_name)
}
pub fn new_name_occupied(&self) -> bool {
func_exists(&self.new_name)
}
}
#[derive(Clone, Debug)]
pub struct CreateProcParams {
pub name: String,
pub params: RoutineParams,
pub language: RoutineLanguage,
pub body: String,
pub security: RoutineSecurity,
pub owner: UserId,
}
fn func_exists(name: &str) -> bool {
let func_space = Space::from(SystemSpace::Func);
let name_idx = func_space
.index_cached("name")
.expect("_function should have an index by name");
let t = name_idx
.get(&[name])
.expect("reading from _function shouldn't fail");
t.is_some()
}
impl CreateProcParams {
pub fn func_exists(&self) -> bool {
func_exists(&self.name)
}
pub fn validate(&self, storage: &Clusterwide) -> traft::Result<()> {
let routine = with_su(ADMIN_ID, || storage.routines.by_name(&self.name))??;
if let Some(def) = routine {
if def.kind != RoutineKind::Procedure {
return Err(CreateRoutineError::ExistsWithDifferentKind { name: def.name })?;
}
if def.params != self.params {
return Err(CreateRoutineError::ExistsWithDifferentParams { name: def.name })?;
}
if def.language != self.language {
return Err(CreateRoutineError::ExistsWithDifferentLanguage { name: def.name })?;
}
if def.body != self.body {
return Err(CreateRoutineError::ExistsWithDifferentBody { name: def.name })?;
}
if def.security != self.security {
return Err(CreateRoutineError::ExistsWithDifferentSecurity { name: def.name })?;
}
if def.owner != self.owner {
return Err(CreateRoutineError::ExistsWithDifferentOwner { name: def.name })?;
}
}
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct CreateIndexParams {
pub(crate) name: String,
pub(crate) space_name: String,
pub(crate) columns: Vec<String>,
pub(crate) ty: IndexType,
pub(crate) opts: Vec<IndexOption>,
pub(crate) initiator: UserId,
}
impl CreateIndexParams {
pub fn index_exists(&self) -> bool {
let sys_index = Space::from(SystemSpace::Index);
let name_idx = sys_index
.index_cached("name")
.expect("_index should have an index by table_id and name");
let Some(space) = Space::find_cached(&self.space_name) else {
return false;
};
let idx = name_idx
.get(&(&space.id(), &self.name))
.expect("reading from _index shouldn't fail");
idx.is_some()
}
pub fn table(&self, storage: &Clusterwide) -> traft::Result<TableDef> {
let table = with_su(ADMIN_ID, || storage.tables.by_name(&self.space_name))??;
let Some(table) = table else {
return Err(CreateIndexError::TableNotFound {
table_name: self.space_name.clone(),
})?;
};
Ok(table)
}
pub fn next_index_id(&self) -> traft::Result<IndexId> {
let Some(space) = Space::find_cached(&self.space_name) else {
return Err(CreateIndexError::TableNotFound {
table_name: self.space_name.clone(),
})?;
};
// The logic is taken from the box.schema.index.create function
// (box/lua/schema.lua).
let sys_vindex = Space::from(SystemSpace::VIndex);
let mut iter = sys_vindex
.primary_key()
.select(IteratorType::LE, &[space.id()])?;
let Some(tuple) = iter.next() else {
panic!("no primary key found for space {}", self.space_name);
};
let id: IndexId = tuple.get(1).expect("index id should be present");
Ok(id + 1)
}
pub fn parts(&self, storage: &Clusterwide) -> traft::Result<Vec<Part>> {
let table = self.table(storage)?;
let mut parts = Vec::with_capacity(self.columns.len());
for column_name in &self.columns {
let found = table.format.iter().find(|c| &c.name == column_name);
let Some(column) = found else {
return Err(CreateIndexError::FieldUndefined {
name: column_name.clone(),
}
.into());
};
let index_field_type = try_space_field_type_to_index_field_type(column.field_type)
.ok_or_else(|| CreateIndexError::IncompatibleIndexColumnType {
ty: self.ty.to_string(),
ctype: column.field_type.to_string(),
})?;
let part = Part {
field: NumOrStr::Str(column_name.into()),
r#type: Some(index_field_type),
collation: None,
is_nullable: Some(column.is_nullable),
path: None,
};
parts.push(part);
}
Ok(parts)
}
fn is_duplicate(&self, table_id: SpaceId, index: IndexDef) -> bool {
table_id == index.table_id
&& self.name == index.name
&& self.ty == index.ty
&& index.opts_equal(self.opts.as_slice())
}
pub fn into_ddl(&self, storage: &Clusterwide) -> Result<Ddl, Error> {
let table = self.table(storage)?;
let by_fields = self.parts(storage)?;
let index_id = self.next_index_id()?;
let ddl = Ddl::CreateIndex {
space_id: table.id,
index_id,
name: self.name.clone(),
ty: self.ty,
by_fields,
opts: self.opts.clone(),
initiator: self.initiator,
};
Ok(ddl)
}
pub fn validate(&self, storage: &Clusterwide) -> traft::Result<()> {
let table = self.table(storage)?;
if !table.operable {
return Err(CreateIndexError::TableNotOperable {
table_name: self.space_name.clone(),
})?;
}
let mut filed_names: AHashSet<&str> = AHashSet::with_capacity(table.format.len());
for field in &table.format {
filed_names.insert(field.name.as_str());
}
for column in &self.columns {
if !filed_names.contains(column.as_str()) {
return Err(CreateIndexError::FieldUndefined {
name: column.clone(),
})?;
}
}
for opt in &self.opts {
if table.engine != SpaceEngineType::Vinyl && opt.is_vinyl() {
return Err(CreateIndexError::IncompatibleTableEngineOption {
engine: table.engine.to_string(),
option: opt.type_name().into(),
})?;
}
if self.ty != IndexType::Rtree && opt.is_rtree() {
return Err(CreateIndexError::IncompatibleIndexTypeOption {
ty: self.ty.to_string(),
option: opt.type_name().into(),
})?;
}
if self.ty != IndexType::Tree && opt.is_tree() {
return Err(CreateIndexError::IncompatibleIndexTypeOption {
ty: self.ty.to_string(),
option: opt.type_name().into(),
})?;
}
if let &IndexOption::Unique(false) = opt {
if self.ty == IndexType::Hash {
return Err(CreateIndexError::NonUniqueIndexType {
ty: self.ty.to_string(),
})?;
}
}
if let &IndexOption::Unique(true) = opt {
if self.ty == IndexType::Rtree || self.ty == IndexType::Bitset {
return Err(CreateIndexError::UniqueIndexType {
ty: self.ty.to_string(),
})?;
}
// Unique index for the sharded table must duplicate its sharding key columns.
if let Distribution::ShardedImplicitly { sharding_key, .. } = &table.distribution {
if sharding_key.len() != self.columns.len() {
return Err(CreateIndexError::IncompatibleUniqueIndexColumns)?;
}
for (sharding_key, column) in sharding_key.iter().zip(&self.columns) {
if sharding_key != column {
return Err(CreateIndexError::IncompatibleUniqueIndexColumns)?;
}
}
}
}
}
let check_multipart = |parts: &[Part]| -> Result<(), CreateIndexError> {
if parts.len() > 1 {
return Err(CreateIndexError::IncompatibleIndexMultipleColumns {
ty: self.ty.to_string(),
})?;
}
Ok(())
};
let check_part_nullability = |part: &Part| -> Result<(), CreateIndexError> {
if part.is_nullable == Some(true) {
return Err(CreateIndexError::IncompatipleNullableColumn {
ty: self.ty.to_string(),
})?;
}
Ok(())
};
let check_part_type = |part: &Part,
eq: bool,
types: &[Option<IndexFieldType>]|
-> Result<(), CreateIndexError> {
let ctype = part
.r#type
.as_ref()
.map(|t| t.to_string())
.unwrap_or_else(|| "unknown".into());
if eq {
if !types.iter().any(|t| t == &part.r#type) {
return Err(CreateIndexError::IncompatibleIndexColumnType {
ty: self.ty.to_string(),
ctype,
})?;
}
} else if types.iter().any(|t| t == &part.r#type) {
return Err(CreateIndexError::IncompatibleIndexColumnType {
ty: self.ty.to_string(),
ctype,
})?;
}
Ok(())
};
let parts = self.parts(storage)?;
match self.ty {
IndexType::Bitset => {
check_multipart(&parts)?;
for part in &parts {
check_part_nullability(part)?;
check_part_type(
part,
true,
&[
Some(IndexFieldType::Unsigned),
Some(IndexFieldType::String),
Some(IndexFieldType::Varbinary),
],
)?;
}
}
IndexType::Rtree => {
check_multipart(&parts)?;
for part in &parts {
check_part_nullability(part)?;
check_part_type(part, true, &[Some(IndexFieldType::Array)])?;
}
}
IndexType::Hash => {
for part in &parts {
check_part_nullability(part)?;
check_part_type(
part,
false,
&[Some(IndexFieldType::Array), Some(IndexFieldType::Datetime)],
)?;
}
}
IndexType::Tree => {
for part in &parts {
check_part_type(part, false, &[Some(IndexFieldType::Array)])?;
}
}
}
let index = with_su(ADMIN_ID, || storage.indexes.by_name(&self.name))??;
if let Some(index) = index {
if self.is_duplicate(table.id, index) {
return Err(CreateIndexError::IndexAlreadyExists {
name: self.name.clone(),
})?;
}
}
Ok(())
}
}
#[derive(Clone, Debug, LuaRead)]
pub struct CreateTableParams {
pub(crate) id: Option<SpaceId>,
pub(crate) name: String,
pub(crate) format: Vec<Field>,
pub(crate) primary_key: Vec<String>,
pub(crate) distribution: DistributionParam,
pub(crate) by_field: Option<String>,
pub(crate) sharding_key: Option<Vec<String>>,
pub(crate) sharding_fn: Option<ShardingFn>,
pub(crate) engine: Option<SpaceEngineType>,
pub(crate) owner: UserId,
pub(crate) tier: Option<String>,
/// Timeout in seconds.
///
/// Specifying the timeout identifies how long user is ready to wait for ddl to be applied.
/// But it does not provide guarantees that a ddl will be aborted if wait for commit timeouts.
pub timeout: Option<f64>,
}
impl CreateTableParams {
/// Checks for the following conditions:
/// 1) specified tier exists
/// 2) specified tier contains at least one instance
///
/// Checks occur only in case of a sharded table.
pub fn check_tier_exists(&self, storage: &Clusterwide) -> traft::Result<()> {
if self.distribution == DistributionParam::Sharded {
let tier = self.tier.as_deref().unwrap_or(DEFAULT_TIER);
if storage.tiers.by_name(tier)?.is_none() {
return Err(CreateTableError::UnexistingTier {
tier_name: tier.to_string(),
}
.into());
};
let tier_is_not_empty = storage
.instances
.iter()?
.any(|instance| instance.tier == tier);
if !tier_is_not_empty {
return Err(CreateTableError::EmptyTier {
tier_name: tier.to_string(),
}
.into());
}
}
Ok(())
}
/// Checks if space described by options already exists. Returns an error if
/// the space with given id exists, but has a different name.
pub fn space_exists(&self) -> traft::Result<bool> {
// The check is performed using `box.space` API, so that local spaces are counted too.
let sys_space = Space::from(SystemSpace::Space);
let Some(id) = self.id else {
let sys_space_by_name = sys_space
.index_cached("name")
.expect("_space should have an index by name");
let t = sys_space_by_name
.get(&[&self.name])
.expect("reading from _space shouldn't fail");
return Ok(t.is_some());
};
let t = sys_space
.get(&[id])
.expect("reading from _space shouldn't fail");
let Some(t) = t else {
return Ok(false);
};
let existing_name: &str = t.get("name").expect("space metadata should contain a name");
if existing_name == self.name {
return Ok(true);
} else {
// TODO: check everything else is the same
// https://git.picodata.io/picodata/picodata/picodata/-/issues/331
return Err(CreateTableError::ExistsWithDifferentName {
id,
expected_name: self.name.clone(),
actual_name: existing_name.into(),
}
.into());
}
}
pub fn validate(&self) -> traft::Result<()> {
// Check space id fits in the allowed range
if let Some(id) = self.id {
if id <= SPACE_ID_INTERNAL_MAX {
crate::tlog!(Warning, "requested space id {id} is in the range 0..={SPACE_ID_INTERNAL_MAX} reserved for future use by picodata, you may have a conflict in a future version");
}
}
// All field names are unique
let mut field_names = HashSet::new();
for field in &self.format {
if !field_names.insert(field.name.as_str()) {
return Err(CreateTableError::DuplicateFieldName(field.name.clone()).into());
}
}
// All primary key components exist in fields
for part in &self.primary_key {
if !field_names.contains(part.as_str()) {
return Err(CreateTableError::FieldUndefined(part.clone()).into());
}
}
// Global spaces must have memtx engine
if self.distribution == DistributionParam::Global
&& self.engine.is_some_and(|e| e != SpaceEngineType::Memtx)
{
return Err(CreateTableError::IncompatibleGlobalSpaceEngine.into());
}
// All sharding key components exist in fields
if self.distribution == DistributionParam::Sharded {
match (&self.by_field, &self.sharding_key) {
(Some(by_field), None) => {
if !field_names.contains(by_field.as_str()) {
return Err(CreateTableError::FieldUndefined(by_field.clone()).into());
}
if self.sharding_fn.is_some() {
crate::tlog!(
Warning,
"`sharding_fn` is specified but will be ignored, as sharding `by_field` is set"
);
}
}
(None, Some(sharding_key)) => {
let mut parts = HashSet::new();
for part in sharding_key {
if !field_names.contains(part.as_str()) {
return Err(CreateTableError::FieldUndefined(part.clone()).into());
}
// And all parts are unique
if !parts.insert(part.as_str()) {
return Err(CreateTableError::DuplicateFieldName(part.clone()).into());
}
}
}
(None, None) => return Err(CreateTableError::ShardingPolicyUndefined.into()),
(Some(_), Some(_)) => {
return Err(CreateTableError::ConflictingShardingPolicy.into());
}
}
} else {
if self.by_field.is_some() {
crate::tlog!(
Warning,
"`by_field` is specified but will be ignored, as `distribution` is `global`"
);
}
if self.sharding_key.is_some() {
crate::tlog!(
Warning,
"`sharding_key` is specified but will be ignored, as `distribution` is `global`"
);
}
if self.sharding_fn.is_some() {
crate::tlog!(
Warning,
"`sharding_fn` is specified but will be ignored, as `distribution` is `global`"
);
}
}
Ok(())
}
/// Create space and then rollback.
///
/// Should be used for checking if a space with these params can be created.
pub fn test_create_space(&self, storage: &Clusterwide) -> traft::Result<()> {
let id = self.id.expect("space id should've been chosen by now");
let user = storage
.users
.by_id(self.owner)?
.ok_or_else(|| Error::Other(format!("user with id {} not found", self.owner).into()))?
.name;
// TODO: This is needed because we do a dry-run of space creation to verify it's parameters,
// which may fail if the operation is initiated on a read-only replica. For this reason we
// temporarily switch off the read-only mode. This however will stop working once we add support
// for synchronous transactions, because read-onlyness will be controlled by the internal raft machinery.
// At that point we will need to rewrite this code and implement the explicit verification of the parameters.
let lua = ::tarantool::lua_state();
let was_read_only: bool = lua.eval(
"local is_ro = box.cfg.read_only
if is_ro then
box.cfg { read_only = false }
end
return is_ro",
)?;
let err = transaction(|| -> Result<(), Option<tarantool::error::Error>> {
// TODO: allow create_space to accept user by id
::tarantool::schema::space::create_space(
&self.name,
&SpaceCreateOptions {
if_not_exists: false,
engine: self.engine.unwrap_or_default(),
id: Some(id),
field_count: self.format.len() as u32,
user: Some(user),
space_type: SpaceType::Normal,
format: Some(
self.format
.iter()
.cloned()
.map(tarantool::space::Field::from)
.collect(),
),
},
)
.map_err(Some)?;
// Rollback space creation
Err(None)
})
.unwrap_err();
if was_read_only {
lua.exec("box.cfg { read_only = true }")?;
}
match err {
// Space was successfully created and rolled back
TransactionError::RolledBack(None) => Ok(()),
// Space creation failed
TransactionError::RolledBack(Some(err)) => Err(err.into()),
// Error during commit or rollback
err => panic!("transaction mechanism should not fail: {err:?}"),
}
}
/// Chooses an id for the new space if it's not set yet and sets `self.id`.
pub fn choose_id_if_not_specified(&mut self) -> traft::Result<()> {
let sys_space = Space::from(SystemSpace::Space);
let id = if let Some(id) = self.id {
id
} else {
let id_range_min = SPACE_ID_INTERNAL_MAX + 1;
let id_range_max = SPACE_ID_TEMPORARY_MIN;
let mut iter = sys_space.select(IteratorType::LT, &[id_range_max])?;
let tuple = iter.next().expect("there's always at least system spaces");
let mut max_id: SpaceId = tuple
.field(0)
.expect("space metadata should decode fine")
.expect("space id should always be present");
let find_next_unused_id = |start: SpaceId| -> Result<SpaceId, Error> {
let iter = sys_space.select(IteratorType::GE, &[start])?;
let mut next_id = start;
for tuple in iter {
let id: SpaceId = tuple
.field(0)
.expect("space metadata should decode fine")
.expect("space id should always be present");
if id != next_id {
// Found a hole in the id range.
return Ok(next_id);
}
next_id += 1;
}
Ok(next_id)
};
if max_id < id_range_min {
max_id = id_range_min;
}
let mut id = find_next_unused_id(max_id)?;
if id >= id_range_max {
id = find_next_unused_id(id_range_min)?;
if id >= id_range_max {
set_error!(TarantoolErrorCode::CreateSpace, "space id limit is reached");
return Err(TarantoolError::last().into());
}
}
id
};
self.id = Some(id);
Ok(())
}
pub fn into_ddl(self) -> traft::Result<Ddl> {
let id = self.id.expect("space id should've been chosen by now");
let primary_key: Vec<_> = self.primary_key.into_iter().map(Part::field).collect();
let format: Vec<_> = self
.format
.into_iter()
.map(tarantool::space::Field::from)
.collect();
let distribution = match self.distribution {
DistributionParam::Global => Distribution::Global,
DistributionParam::Sharded => {
// Case when tier wasn't specified explicitly. On that stage we sure that specified tier exists and isn't empty.
let tier = self.tier.unwrap_or(DEFAULT_TIER.into());
if let Some(field) = self.by_field {
Distribution::ShardedByField { field, tier }
} else {
Distribution::ShardedImplicitly {
sharding_key: self
.sharding_key
.expect("should be checked during `validate`"),
sharding_fn: self.sharding_fn.unwrap_or_default(),
tier,
}
}
}
};
let res = Ddl::CreateTable {
id,
name: self.name,
format,
primary_key,
distribution,
engine: self.engine.unwrap_or_default(),
owner: self.owner,
};
Ok(res)
}
}
/// Minimum id in the range of ids reserved for temporary spaces.
///
/// Temporary spaces need a special range of ids to avoid conflicts with
/// spaces defined on replicas. This value is defined in tarantool, see
/// <https://git.picodata.io/picodata/tarantool/-/blob/5c3c8ed32c7a9c84a0e86c8453269f0925ce63ed/src/box/schema_def.h#L67>
const SPACE_ID_TEMPORARY_MIN: SpaceId = 1 << 30;
/// Waits for a pending ddl to be either `Committed` or `Aborted` by the governor.
///
/// Returns an index of the corresponding `DdlCommit` entry.
///
/// If `timeout` is reached earlier returns an error.
pub fn wait_for_ddl_commit(
prepare_commit: RaftIndex,
timeout: Duration,
) -> traft::Result<RaftIndex> {
let node = node::global()?;
let raft_storage = &node.raft_storage;
let deadline = fiber::clock().saturating_add(timeout);
let last_seen = prepare_commit;
loop {
let cur_applied = node.get_index();
let new_entries = raft_storage.entries(last_seen + 1, cur_applied + 1, None)?;
for entry in new_entries {
if entry.entry_type != raft::prelude::EntryType::EntryNormal {
continue;
}
let index = entry.index;
let op = entry.into_op().unwrap_or(Op::Nop);
match op {
Op::DdlCommit => return Ok(index),
Op::DdlAbort => return Err(DdlError::Aborted.into()),
_ => (),
}
}
node.wait_index(cur_applied + 1, deadline.duration_since(fiber::clock()))?;
}
}
/// Aborts a pending DDL operation and waits for abort to be committed localy.
/// If `timeout` is reached earlier returns an error.
///
/// Returns an index of the corresponding DdlAbort raft entry, or an error if
/// there is no pending DDL operation.
pub fn abort_ddl(timeout: Duration) -> traft::Result<RaftIndex> {
let node = node::global()?;
loop {
if node.storage.properties.pending_schema_change()?.is_none() {
return Err(DdlError::NoPendingDdl.into());
}
let index = node.get_index();
let term = raft::Storage::term(&node.raft_storage, index)?;
#[rustfmt::skip]
let predicate = cas::Predicate {
index,
term,
ranges: vec![
cas::Range::new(ClusterwideTable::Property).eq([PropertyName::PendingSchemaChange]),
cas::Range::new(ClusterwideTable::Property).eq([PropertyName::GlobalSchemaVersion]),
cas::Range::new(ClusterwideTable::Property).eq([PropertyName::NextSchemaVersion]),
],
};
let req = Request::new(Op::DdlAbort, predicate, effective_user_id())?;
// FIXME: this error handling is wrong, must retry if e.is_retriable()
let (index, term) = compare_and_swap(&req, timeout)?;
node.wait_index(index, timeout)?;
if raft::Storage::term(&node.raft_storage, index)? != term {
// leader switched - retry
continue;
}
return Ok(index);
}
}
mod tests {
use tarantool::{auth::AuthMethod, space::FieldType};
use super::*;
fn storage() -> Clusterwide {
let storage = Clusterwide::for_tests();
storage
.users
.insert(&UserDef {
id: ADMIN_ID,
name: String::from("admin"),
schema_version: 0,
auth: Some(AuthDef::new(AuthMethod::ChapSha1, String::from(""))),
owner: ADMIN_ID,
ty: UserMetadataKind::User,
})
.unwrap();
storage
}
#[::tarantool::test]
fn test_create_space() {
let storage = storage();
CreateTableParams {
id: Some(1337),
name: "friends_of_peppa".into(),
format: vec![
Field {
name: "id".into(),
r#type: FieldType::Number,
is_nullable: false,
},
Field {
name: "name".into(),
r#type: FieldType::String,
is_nullable: false,
},
],
primary_key: vec![],
distribution: DistributionParam::Global,
by_field: None,
sharding_key: None,
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
tier: None,
}
.test_create_space(&storage)
.unwrap();
assert!(tarantool::space::Space::find("friends_of_peppa").is_none());
CreateTableParams {
id: Some(1337),
name: "friends_of_peppa".into(),
format: vec![],
primary_key: vec![],
distribution: DistributionParam::Sharded,
by_field: None,
sharding_key: None,
sharding_fn: None,
engine: Some(SpaceEngineType::Vinyl),
timeout: None,
owner: ADMIN_ID,
tier: None,
}
.test_create_space(&storage)
.unwrap();
assert!(tarantool::space::Space::find("friends_of_peppa").is_none());
let err = CreateTableParams {
id: Some(0),
name: "friends_of_peppa".into(),
format: vec![],
primary_key: vec![],
distribution: DistributionParam::Global,
by_field: None,
sharding_key: None,
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
tier: None,
}
.test_create_space(&storage)
.unwrap_err();
assert_eq!(
err.to_string(),
"box error: CreateSpace: Failed to create space 'friends_of_peppa': space id 0 is reserved"
);
}
#[::tarantool::test]
fn ddl() {
let new_space = "new_space";
let new_id = 1;
let field1 = Field {
name: "field1".into(),
r#type: FieldType::Any,
is_nullable: false,
};
let field2 = Field {
name: "field2".into(),
r#type: FieldType::Any,
is_nullable: false,
};
let err = CreateTableParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone(), field1.clone()],
primary_key: vec![],
distribution: DistributionParam::Global,
by_field: None,
sharding_key: None,
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
tier: None,
}
.validate()
.unwrap_err();
assert_eq!(err.to_string(), "several fields have the same name: field1");
let err = CreateTableParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone()],
primary_key: vec![field2.name.clone()],
distribution: DistributionParam::Global,
by_field: None,
sharding_key: None,
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
tier: None,
}
.validate()
.unwrap_err();
assert_eq!(err.to_string(), "no field with name: field2");
let err = CreateTableParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone()],
primary_key: vec![],
distribution: DistributionParam::Sharded,
by_field: None,
sharding_key: Some(vec![field2.name.clone()]),
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
tier: None,
}
.validate()
.unwrap_err();
assert_eq!(err.to_string(), "no field with name: field2");
let err = CreateTableParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone()],
primary_key: vec![],
distribution: DistributionParam::Sharded,
by_field: None,
sharding_key: Some(vec![field1.name.clone(), field1.name.clone()]),
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
tier: None,
}
.validate()
.unwrap_err();
assert_eq!(err.to_string(), "several fields have the same name: field1");
let err = CreateTableParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone()],
primary_key: vec![],
distribution: DistributionParam::Sharded,
by_field: Some(field2.name.clone()),
sharding_key: None,
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
tier: None,
}
.validate()
.unwrap_err();
assert_eq!(err.to_string(), "no field with name: field2");
let err = CreateTableParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone()],
primary_key: vec![],
distribution: DistributionParam::Sharded,
by_field: None,
sharding_key: None,
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
tier: None,
}
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"distribution is `sharded`, but neither `by_field` nor `sharding_key` is set"
);
let err = CreateTableParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone()],
primary_key: vec![],
distribution: DistributionParam::Sharded,
by_field: Some(field2.name.clone()),
sharding_key: Some(vec![]),
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
tier: None,
}
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"only one of sharding policy fields (`by_field`, `sharding_key`) should be set"
);
CreateTableParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone(), field2.clone()],
primary_key: vec![field2.name.clone()],
distribution: DistributionParam::Sharded,
by_field: Some(field2.name.clone()),
sharding_key: None,
sharding_fn: None,
engine: None,
timeout: None,
owner: ADMIN_ID,
tier: None,
}
.validate()
.unwrap();
let err = CreateTableParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1, field2.clone()],
primary_key: vec![field2.name],
distribution: DistributionParam::Global,
by_field: None,
sharding_key: None,
sharding_fn: None,
engine: Some(SpaceEngineType::Vinyl),
timeout: None,
owner: ADMIN_ID,
tier: None,
}
.validate()
.unwrap_err();
assert_eq!(err.to_string(), "global spaces only support memtx engine");
}
#[::tarantool::test]
fn test_space_id_temporary_min() {
let lua = tarantool::lua_state();
let id: SpaceId = lua
.eval("return box.schema.SPACE_ID_TEMPORARY_MIN")
.unwrap();
assert_eq!(id, SPACE_ID_TEMPORARY_MIN);
}
}
#[cfg(test)]
mod test {
use super::*;
use tarantool::tuple::ToTupleBuffer;
#[test]
#[rustfmt::skip]
fn space_def_matches_format() {
use ::tarantool::msgpack;
let i = TableDef::for_tests();
let tuple_data = msgpack::encode(&i);
let format = TableDef::format();
crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "TableDef::format");
assert_eq!(format[TableDef::FIELD_OPERABLE].name, "operable");
}
#[test]
#[rustfmt::skip]
fn index_def_matches_format() {
let i = IndexDef::for_tests();
let tuple_data = i.to_tuple_buffer().unwrap();
let format = IndexDef::format();
crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "IndexDef::format");
assert_eq!(format[IndexDef::FIELD_OPERABLE].name, "operable");
}
#[test]
#[rustfmt::skip]
fn user_def_matches_format() {
let i = UserDef::for_tests();
let tuple_data = i.to_tuple_buffer().unwrap();
let format = UserDef::format();
crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "UserDef::format");
assert_eq!(format[UserDef::FIELD_AUTH].name, "auth");
assert_eq!(format[UserDef::FIELD_NAME].name, "name");
}
#[test]
#[rustfmt::skip]
fn privilege_def_matches_format() {
let i = PrivilegeDef::for_tests();
let tuple_data = i.to_tuple_buffer().unwrap();
let format = PrivilegeDef::format();
crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "PrivilegeDef::format");
}
#[track_caller]
fn check_object_privilege(
object_type: SchemaObjectType,
valid_privileges: &'static [PrivilegeType],
object_id: i64,
) {
for privilege in PrivilegeType::VARIANTS {
let res = PrivilegeDef::new(*privilege, object_type, object_id, 1, 1, 1);
if valid_privileges.contains(privilege) {
assert!(res.is_ok())
} else {
assert_eq!(
res.unwrap_err(),
InvalidPrivilegeError {
object_type,
unsupported: *privilege,
expected_one_of: valid_privileges,
}
)
}
}
}
#[test]
fn privilege_def_validation() {
use PrivilegeType::*;
// table
let valid = &[Read, Write, Create, Alter, Drop];
check_object_privilege(SchemaObjectType::Table, valid, -1);
// particular table
let valid = &[Read, Write, Alter, Drop];
check_object_privilege(SchemaObjectType::Table, valid, 42);
// user
let valid = &[Create, Alter, Drop];
check_object_privilege(SchemaObjectType::User, valid, -1);
// particular user
let valid = &[Alter, Drop];
check_object_privilege(SchemaObjectType::User, valid, 42);
// role
let valid = &[Create, Drop];
check_object_privilege(SchemaObjectType::Role, valid, -1);
// particular role
let valid = &[Execute, Drop];
check_object_privilege(SchemaObjectType::Role, valid, 42);
// universe
let valid = &[Login];
check_object_privilege(SchemaObjectType::Universe, valid, 0);
}
#[test]
fn routine_def_matches_format() {
let i = RoutineDef::for_tests();
let tuple_data = i.to_tuple_buffer().unwrap();
let format = RoutineDef::format();
crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "RoutineDef::format");
}
#[test]
#[rustfmt::skip]
fn plugin_def_matches_format() {
let p = PluginDef::for_tests();
let tuple_data = p.to_tuple_buffer().unwrap();
let format = PluginDef::format();
crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "PluginDef::format");
assert_eq!(format[PluginDef::FIELD_ENABLE].name, "enabled");
}
#[test]
#[rustfmt::skip]
fn service_def_matches_format() {
let s = ServiceDef::for_tests();
let tuple_data = s.to_tuple_buffer().unwrap();
let format = ServiceDef::format();
crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "ServiceDef::format");
}
#[test]
#[rustfmt::skip]
fn service_route_item_matches_format() {
let s = ServiceRouteItem::for_tests();
let tuple_data = s.to_tuple_buffer().unwrap();
let format = ServiceRouteItem::format();
crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "ServiceRouteItem::format");
assert_eq!(format[ServiceRouteItem::FIELD_POISON as usize].name, "poison");
}
#[test]
#[rustfmt::skip]
fn plugin_migration_matches_format() {
let p = PluginMigrationRecord::for_tests();
let tuple_data = p.to_tuple_buffer().unwrap();
let format = PluginMigrationRecord::format();
crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "PluginMigrationRecord::format");
}
#[test]
#[rustfmt::skip]
fn plugin_config_matches_format() {
let s = PluginConfigRecord::for_tests();
let tuple_data = s.to_tuple_buffer().unwrap();
let format = PluginConfigRecord::format();
crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "PluginConfigRecord::format");
}
}