diff --git a/CHANGELOG.md b/CHANGELOG.md index e2aa49f7325050f45b9284485a6d1569bec0f96f..80867aa0b8890babf2b51332b6f977dbb1f51730 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,11 +33,7 @@ with the `YY.MINOR.MICRO` scheme. - `instance.log_level` -> `instance.log.level` - `instance.memtx_memory` -> `instance.memtx.memory` -======= - Change _pico_index table structure. - -### SQL - - Support index creation with SQL. --> diff --git a/src/cas.rs b/src/cas.rs index d5c460cf5c06d6c30b4c94e1c8c7381ab32ba891..a0969dfdb90ee4fb5d265da2941751ab79956c8c 100644 --- a/src/cas.rs +++ b/src/cas.rs @@ -954,7 +954,7 @@ mod tests { space_id, index_id, name: "index1".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(true)], by_fields: vec![], owner: ADMIN_ID, diff --git a/src/schema.rs b/src/schema.rs index 8296c4238e15f57739a22af3e9245d6dc15caec1..e7caff5d0785a47deacd33d85740c5f241c3142a 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -1,9 +1,12 @@ +use ahash::{AHashMap, AHashSet}; use sbroad::ir::ddl::{Language, ParamDef}; use std::borrow::Cow; use std::collections::{BTreeMap, HashSet}; use std::fmt::Display; +use std::str::FromStr; use std::time::Duration; -use tarantool::index::{IndexType, RtreeIndexDistanceType}; +use tarantool::decimal::Decimal; +use tarantool::index::{FieldType as IndexFieldType, IndexType, Part, RtreeIndexDistanceType}; use tarantool::auth::AuthData; use tarantool::auth::AuthDef; @@ -17,13 +20,13 @@ use tarantool::space::{FieldType, SpaceCreateOptions, SpaceEngineType}; use tarantool::space::{Metadata as SpaceMetadata, Space, SpaceType, SystemSpace}; use tarantool::transaction::{transaction, TransactionError}; use tarantool::{ + index::IndexId, index::IteratorType, index::Metadata as IndexMetadata, - index::{IndexId, Part}, space::SpaceId, tlua::{self, LuaRead}, tuple::Encode, - util::Value, + util::{NumOrStr, Value}, }; use sbroad::ir::value::Value as IrValue; @@ -217,42 +220,102 @@ fn default_bucket_id_field() -> String { // IndexDef //////////////////////////////////////////////////////////////////////////////// -#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +/// Picodata index options. +/// +/// While tarantool module already includes an `IndexOptions` structure +/// for local indexes, the current enum is designed for picodata ones. +/// Currently, these options coincide, but we intend to introduce a +/// REDISTRIBUTION option for secondary indexes. It will be implemented +/// using materialized views instead of tarantool indexes. Therefore, +/// it's important to maintain a distinction between these features, +/// emphasizing that it is specific to picodata. +#[derive(Clone, Debug, Deserialize, Serialize, Hash, PartialEq, Eq)] pub enum IndexOption { - BloomFalsePositiveRate(f64), + /// Vinyl only. The false positive rate for the bloom filter. + #[serde(rename = "bloom_fpr")] + BloomFalsePositiveRate(Decimal), + /// The RTREE index dimension. + #[serde(rename = "dimension")] Dimension(u32), + /// The RTREE index distance type. + #[serde(rename = "distance")] Distance(RtreeIndexDistanceType), + /// Specify whether hint optimization is enabled for the TREE index. + /// If true, the index works faster, if false, the index size is reduced by half. + #[serde(rename = "hint")] + // FIXME: this option is disabled in the current version of the module. Hint(bool), - IfNotExists(bool), + /// Vinyl only. The page size in bytes used for read and write disk operations. + #[serde(rename = "page_size")] PageSize(u32), + /// Vinyl only. The range size in bytes used for vinyl index. + #[serde(rename = "range_size")] RangeSize(u32), + /// Vinyl only. The number of runs per level in the LSM tree. + #[serde(rename = "run_count_per_level")] RunCountPerLevel(u32), - RunSizeRatio(f64), + /// Vinyl only. The ratio between the size of different levels in the LSM tree. + #[serde(rename = "run_size_ratio")] + RunSizeRatio(Decimal), + /// Specify whether the index is unique. When true, the index cannot contain duplicate values. + #[serde(rename = "unique")] Unique(bool), } -// We can safely derive `Eq` because `f64` is never `NaN`. -impl Eq for IndexOption {} - impl IndexOption { pub fn as_kv(&self) -> (Cow<'static, str>, Value<'_>) { + let dec_to_f64 = |d: Decimal| f64::from_str(&d.to_string()).expect("decimal to f64"); match self { - IndexOption::BloomFalsePositiveRate(rate) => ("bloom_fpr".into(), Value::Double(*rate)), + IndexOption::BloomFalsePositiveRate(rate) => { + ("bloom_fpr".into(), Value::Double(dec_to_f64(*rate))) + } IndexOption::Dimension(dim) => ("dimension".into(), Value::Num(*dim)), IndexOption::Distance(dist) => ("distance".into(), Value::Str(dist.as_str().into())), IndexOption::Hint(hint) => ("hint".into(), Value::Bool(*hint)), - IndexOption::IfNotExists(if_not_exists) => { - ("if_not_exists".into(), Value::Bool(*if_not_exists)) - } IndexOption::PageSize(size) => ("page_size".into(), Value::Num(*size)), IndexOption::RangeSize(size) => ("range_size".into(), Value::Num(*size)), IndexOption::RunCountPerLevel(count) => { ("run_count_per_level".into(), Value::Num(*count)) } - IndexOption::RunSizeRatio(ratio) => ("run_size_ratio".into(), Value::Double(*ratio)), + IndexOption::RunSizeRatio(ratio) => { + ("run_size_ratio".into(), Value::Double(dec_to_f64(*ratio))) + } IndexOption::Unique(unique) => ("unique".into(), Value::Bool(*unique)), } } + + pub fn is_vinyl(&self) -> bool { + matches!( + self, + IndexOption::BloomFalsePositiveRate(_) + | IndexOption::PageSize(_) + | IndexOption::RangeSize(_) + | IndexOption::RunCountPerLevel(_) + | IndexOption::RunSizeRatio(_) + ) + } + + pub fn is_rtree(&self) -> bool { + matches!(self, IndexOption::Dimension(_) | IndexOption::Distance(_)) + } + + pub fn is_tree(&self) -> bool { + matches!(self, IndexOption::Hint(_)) + } + + pub fn type_name(&self) -> &str { + match self { + IndexOption::BloomFalsePositiveRate(_) => "bloom_fpr", + IndexOption::Dimension(_) => "dimension", + IndexOption::Distance(_) => "distance", + IndexOption::Hint(_) => "hint", + IndexOption::PageSize(_) => "page_size", + IndexOption::RangeSize(_) => "range_size", + IndexOption::RunCountPerLevel(_) => "run_count_per_level", + IndexOption::RunSizeRatio(_) => "run_size_ratio", + IndexOption::Unique(_) => "unique", + } + } } /// Database index definition. @@ -263,7 +326,8 @@ pub struct IndexDef { pub table_id: SpaceId, pub id: IndexId, pub name: String, - pub itype: IndexType, + #[serde(rename = "type")] + pub ty: IndexType, pub opts: Vec<IndexOption>, pub parts: Vec<Part>, pub operable: bool, @@ -285,7 +349,7 @@ impl IndexDef { Field::from(("table_id", FieldType::Unsigned)), Field::from(("id", FieldType::Unsigned)), Field::from(("name", FieldType::String)), - Field::from(("itype", FieldType::String)), + Field::from(("type", FieldType::String)), Field::from(("opts", FieldType::Array)), Field::from(("parts", FieldType::Array)), Field::from(("operable", FieldType::Boolean)), @@ -301,7 +365,7 @@ impl IndexDef { table_id: 10569, id: 1, name: "secondary".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![], parts: vec![], operable: true, @@ -320,13 +384,26 @@ impl IndexDef { space_id: self.table_id, index_id: self.id, name: self.name.as_str().into(), - r#type: self.itype, + r#type: self.ty, opts, parts: self.parts.clone(), }; index_meta } + + pub fn opts_equal(&self, opts: &[IndexOption]) -> bool { + let mut set: AHashSet<&IndexOption> = AHashSet::with_capacity(opts.len()); + for opt in &self.opts { + set.insert(opt); + } + for opt in opts { + if !set.contains(opt) { + return false; + } + } + true + } } //////////////////////////////////////////////////////////////////////////////// @@ -1321,6 +1398,8 @@ pub enum DdlError { NoPendingDdl, #[error("{0}")] CreateRoutine(#[from] CreateRoutineError), + #[error("{0}")] + CreateIndex(#[from] CreateIndexError), } #[derive(Debug, thiserror::Error)] @@ -1349,6 +1428,40 @@ impl From<CreateTableError> for Error { } } +#[derive(Debug, thiserror::Error)] +pub enum CreateIndexError { + #[error("index {name} already exists with a different signature")] + IndexAlreadyExists { name: String }, + #[error("table {table_name} not found")] + TableNotFound { table_name: String }, + #[error("table {table_name} is not operable")] + TableNotOperable { table_name: String }, + #[error("no field with name: {name}")] + FieldUndefined { name: String }, + #[error("table engine {engine} does not support option {option}")] + IncompatibleTableEngineOption { engine: String, option: String }, + #[error("index type {ty} does not support option {option}")] + IncompatibleIndexTypeOption { ty: String, option: String }, + #[error("index type {ty} does not support column type {ctype}")] + IncompatibleIndexColumnType { ty: String, ctype: String }, + #[error("index type {ty} does not support nullable columns")] + IncompatipleNullableColumn { ty: String }, + #[error("unique index for the sharded table must duplicate its sharding key columns")] + IncompatibleUniqueIndexColumns, + #[error("index type {ty} does not support unique indexes")] + UniqueIndexType { ty: String }, + #[error("index type {ty} does not support non-unique indexes")] + NonUniqueIndexType { ty: String }, + #[error("index type {ty} does not support multiple columns")] + IncompatibleIndexMultipleColumns { ty: String }, +} + +impl From<CreateIndexError> for Error { + fn from(err: CreateIndexError) -> Self { + DdlError::CreateIndex(err).into() + } +} + #[derive(Debug, thiserror::Error)] pub enum CreateRoutineError { #[error("routine {name} already exists with a different kind")] @@ -1468,6 +1581,276 @@ impl CreateProcParams { } } +#[derive(Clone, Debug)] +pub struct CreateIndexParams { + pub(crate) name: String, + pub(crate) space_name: String, + pub(crate) columns: Vec<String>, + pub(crate) ty: IndexType, + pub(crate) opts: Vec<IndexOption>, + pub(crate) owner: UserId, +} + +impl CreateIndexParams { + pub fn index_exists(&self) -> bool { + let sys_index = Space::from(SystemSpace::Index); + let name_idx = sys_index + .index_cached("name") + .expect("_index should have an index by table_id and name"); + let Some(space) = Space::find_cached(&self.space_name) else { + return false; + }; + let idx = name_idx + .get(&(&space.id(), &self.name)) + .expect("reading from _index shouldn't fail"); + idx.is_some() + } + + pub fn table(&self, storage: &Clusterwide) -> traft::Result<TableDef> { + let table = with_su(ADMIN_ID, || storage.tables.by_name(&self.space_name))??; + let Some(table) = table else { + return Err(CreateIndexError::TableNotFound { + table_name: self.space_name.clone(), + })?; + }; + Ok(table) + } + + pub fn next_index_id(&self) -> traft::Result<IndexId> { + let Some(space) = Space::find_cached(&self.space_name) else { + return Err(CreateIndexError::TableNotFound { + table_name: self.space_name.clone(), + })?; + }; + // The logic is taken from the box.schema.index.create function + // (box/lua/schema.lua). + let sys_vindex = Space::from(SystemSpace::VIndex); + let mut iter = sys_vindex + .primary_key() + .select(IteratorType::LE, &[space.id()])?; + let Some(tuple) = iter.next() else { + panic!("no primary key found for space {}", self.space_name); + }; + let id: IndexId = tuple.get(1).expect("index id should be present"); + Ok(id + 1) + } + + pub fn parts(&self, storage: &Clusterwide) -> traft::Result<Vec<Part>> { + let table = self.table(storage)?; + let mut parts = Vec::with_capacity(self.columns.len()); + + type Position = u32; + let mut positions: AHashMap<&str, Position> = AHashMap::with_capacity(table.format.len()); + for (i, field) in table.format.iter().enumerate() { + positions.insert(field.name.as_str(), i as Position); + } + for column_name in &self.columns { + let position = positions.get(column_name.as_str()).cloned(); + let position = position.ok_or_else(|| CreateIndexError::FieldUndefined { + name: column_name.clone(), + })?; + let column = &table.format[position as usize]; + let index_field_type = try_space_field_type_to_index_field_type(column.field_type) + .ok_or_else(|| CreateIndexError::IncompatibleIndexColumnType { + ty: self.ty.to_string(), + ctype: column.field_type.to_string(), + })?; + let part = Part { + field: NumOrStr::Num(position), + r#type: Some(index_field_type), + collation: None, + is_nullable: Some(column.is_nullable), + path: None, + }; + parts.push(part); + } + Ok(parts) + } + + fn is_duplicate(&self, table_id: SpaceId, index: IndexDef) -> bool { + table_id == index.table_id + && self.name == index.name + && self.ty == index.ty + && index.opts_equal(self.opts.as_slice()) + } + + pub fn into_ddl(&self, storage: &Clusterwide) -> Result<Ddl, Error> { + let table = self.table(storage)?; + let by_fields = self.parts(storage)?; + let index_id = self.next_index_id()?; + let ddl = Ddl::CreateIndex { + space_id: table.id, + index_id, + name: self.name.clone(), + ty: self.ty, + by_fields, + opts: self.opts.clone(), + owner: self.owner, + }; + Ok(ddl) + } + + pub fn validate(&self, storage: &Clusterwide) -> traft::Result<()> { + let table = self.table(storage)?; + if !table.operable { + return Err(CreateIndexError::TableNotOperable { + table_name: self.space_name.clone(), + })?; + } + let mut filed_names: AHashSet<&str> = AHashSet::with_capacity(table.format.len()); + for field in &table.format { + filed_names.insert(field.name.as_str()); + } + for column in &self.columns { + if !filed_names.contains(column.as_str()) { + return Err(CreateIndexError::FieldUndefined { + name: column.clone(), + })?; + } + } + + for opt in &self.opts { + if table.engine != SpaceEngineType::Vinyl && opt.is_vinyl() { + return Err(CreateIndexError::IncompatibleTableEngineOption { + engine: table.engine.to_string(), + option: opt.type_name().into(), + })?; + } + if self.ty != IndexType::Rtree && opt.is_rtree() { + return Err(CreateIndexError::IncompatibleIndexTypeOption { + ty: self.ty.to_string(), + option: opt.type_name().into(), + })?; + } + if self.ty != IndexType::Tree && opt.is_tree() { + return Err(CreateIndexError::IncompatibleIndexTypeOption { + ty: self.ty.to_string(), + option: opt.type_name().into(), + })?; + } + if let &IndexOption::Unique(false) = opt { + if self.ty == IndexType::Hash { + return Err(CreateIndexError::NonUniqueIndexType { + ty: self.ty.to_string(), + })?; + } + } + if let &IndexOption::Unique(true) = opt { + if self.ty == IndexType::Rtree || self.ty == IndexType::Bitset { + return Err(CreateIndexError::UniqueIndexType { + ty: self.ty.to_string(), + })?; + } + // Unique index for the sharded table must duplicate its sharding key columns. + if let Distribution::ShardedImplicitly { sharding_key, .. } = &table.distribution { + if sharding_key.len() != self.columns.len() { + return Err(CreateIndexError::IncompatibleUniqueIndexColumns)?; + } + for (sharding_key, column) in sharding_key.iter().zip(&self.columns) { + if sharding_key != column { + return Err(CreateIndexError::IncompatibleUniqueIndexColumns)?; + } + } + } + } + } + + let check_multipart = |parts: &[Part]| -> Result<(), CreateIndexError> { + if parts.len() > 1 { + return Err(CreateIndexError::IncompatibleIndexMultipleColumns { + ty: self.ty.to_string(), + })?; + } + Ok(()) + }; + let check_part_nullability = |part: &Part| -> Result<(), CreateIndexError> { + if part.is_nullable == Some(true) { + return Err(CreateIndexError::IncompatipleNullableColumn { + ty: self.ty.to_string(), + })?; + } + Ok(()) + }; + let check_part_type = |part: &Part, + eq: bool, + types: &[Option<IndexFieldType>]| + -> Result<(), CreateIndexError> { + let ctype = part + .r#type + .as_ref() + .map(|t| t.to_string()) + .unwrap_or_else(|| "unknown".into()); + if eq { + if !types.iter().any(|t| t == &part.r#type) { + return Err(CreateIndexError::IncompatibleIndexColumnType { + ty: self.ty.to_string(), + ctype, + })?; + } + } else if types.iter().any(|t| t == &part.r#type) { + return Err(CreateIndexError::IncompatibleIndexColumnType { + ty: self.ty.to_string(), + ctype, + })?; + } + Ok(()) + }; + let parts = self.parts(storage)?; + match self.ty { + IndexType::Bitset => { + check_multipart(&parts)?; + for part in &parts { + check_part_nullability(part)?; + check_part_type( + part, + true, + &[ + Some(IndexFieldType::Unsigned), + Some(IndexFieldType::String), + Some(IndexFieldType::Varbinary), + ], + )?; + } + } + IndexType::Rtree => { + check_multipart(&parts)?; + for part in &parts { + check_part_nullability(part)?; + check_part_type(part, true, &[Some(IndexFieldType::Array)])?; + } + } + IndexType::Hash => { + for part in &parts { + check_part_nullability(part)?; + check_part_type( + part, + false, + &[Some(IndexFieldType::Array), Some(IndexFieldType::Datetime)], + )?; + } + } + IndexType::Tree => { + for part in &parts { + check_part_type(part, false, &[Some(IndexFieldType::Array)])?; + } + } + } + + let index = with_su(ADMIN_ID, || { + storage.indexes.by_name(table.id, self.name.clone()) + })??; + if let Some(index) = index { + if self.is_duplicate(table.id, index) { + return Err(CreateIndexError::IndexAlreadyExists { + name: self.name.clone(), + })?; + } + } + + Ok(()) + } +} + #[derive(Clone, Debug, LuaRead)] pub struct CreateTableParams { pub(crate) id: Option<SpaceId>, diff --git a/src/sql.rs b/src/sql.rs index c6f28d8ae7b32579e993281c00620efde594c841..031fb2f618cbfb4f23060c2d89f82e85d5304b55 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -2,9 +2,10 @@ use crate::access_control::UserMetadataKind; use crate::schema::{ - wait_for_ddl_commit, CreateProcParams, CreateTableParams, DistributionParam, Field, - PrivilegeDef, PrivilegeType, RenameRoutineParams, RoutineDef, RoutineLanguage, RoutineParamDef, - RoutineParams, RoutineSecurity, SchemaObjectType, ShardingFn, UserDef, ADMIN_ID, + wait_for_ddl_commit, CreateIndexParams, CreateProcParams, CreateTableParams, DistributionParam, + Field, IndexOption, PrivilegeDef, PrivilegeType, RenameRoutineParams, RoutineDef, + RoutineLanguage, RoutineParamDef, RoutineParams, RoutineSecurity, SchemaObjectType, ShardingFn, + UserDef, ADMIN_ID, }; use crate::sql::pgproto::{ with_portals_mut, Portal, PortalDescribe, Statement, StatementDescribe, UserPortalNames, @@ -899,6 +900,61 @@ fn reenterable_schema_change_request( // Check parameters let params = match ir_node { + IrNode::Ddl(Ddl::CreateIndex { + name, + table_name, + columns, + unique, + index_type, + bloom_fpr, + page_size, + range_size, + run_count_per_level, + run_size_ratio, + dimension, + distance, + hint, + .. + }) => { + let mut opts: Vec<IndexOption> = Vec::with_capacity(9); + opts.push(IndexOption::Unique(unique)); + if let Some(bloom_fpr) = bloom_fpr { + opts.push(IndexOption::BloomFalsePositiveRate(bloom_fpr)); + } + if let Some(page_size) = page_size { + opts.push(IndexOption::PageSize(page_size)); + } + if let Some(range_size) = range_size { + opts.push(IndexOption::RangeSize(range_size)); + } + if let Some(run_count_per_level) = run_count_per_level { + opts.push(IndexOption::RunCountPerLevel(run_count_per_level)); + } + if let Some(run_size_ratio) = run_size_ratio { + opts.push(IndexOption::RunSizeRatio(run_size_ratio)); + } + if let Some(dimension) = dimension { + opts.push(IndexOption::Dimension(dimension)); + } + if let Some(distance) = distance { + opts.push(IndexOption::Distance(distance)); + } + if let Some(hint) = hint { + opts.push(IndexOption::Hint(hint)); + } + opts.shrink_to_fit(); + + let params = CreateIndexParams { + name, + space_name: table_name, + columns, + ty: index_type, + opts, + owner: current_user, + }; + params.validate(storage)?; + Params::CreateIndex(params) + } IrNode::Ddl(Ddl::CreateProc { name, params: args, @@ -1072,6 +1128,17 @@ fn reenterable_schema_change_request( // Check for conflicts and make the op let op = match ¶ms { + Params::CreateIndex(params) => { + if params.index_exists() { + // Index already exists, no op needed. + return Ok(ConsumerResult { row_count: 0 }); + } + let ddl = params.into_ddl(storage)?; + Op::DdlPrepare { + schema_version, + ddl, + } + } Params::CreateProcedure(params) => { if params.func_exists() { // Function already exists, no op needed. @@ -1477,6 +1544,7 @@ fn reenterable_schema_change_request( RenameRoutine(RenameRoutineParams), CreateProcedure(CreateProcParams), DropProcedure(String, Option<Vec<ParamDef>>), + CreateIndex(CreateIndexParams), } } diff --git a/src/sql/pgproto.rs b/src/sql/pgproto.rs index d5d20325a5058d08ef34260a889c33dfe2831cc3..330c77c722e6c4045d759af1d2860c22b303cfdb 100644 --- a/src/sql/pgproto.rs +++ b/src/sql/pgproto.rs @@ -583,6 +583,7 @@ pub enum QueryType { pub enum CommandTag { AlterRole = 0, CallProcedure = 16, + CreateIndex = 18, CreateProcedure = 14, CreateRole = 1, CreateTable = 2, @@ -614,6 +615,7 @@ impl From<CommandTag> for QueryType { | CommandTag::RevokeRole => QueryType::Acl, CommandTag::DropTable | CommandTag::CreateTable + | CommandTag::CreateIndex | CommandTag::CreateProcedure | CommandTag::RenameRoutine | CommandTag::DropProcedure => QueryType::Ddl, @@ -654,6 +656,7 @@ impl TryFrom<&Node> for CommandTag { Ddl::CreateProc { .. } => Ok(CommandTag::CreateProcedure), Ddl::DropProc { .. } => Ok(CommandTag::DropProcedure), Ddl::RenameRoutine { .. } => Ok(CommandTag::RenameRoutine), + Ddl::CreateIndex { .. } => Ok(CommandTag::CreateIndex), }, Node::Relational(rel) => match rel { Relational::Delete { .. } => Ok(CommandTag::Delete), diff --git a/src/storage.rs b/src/storage.rs index b8aafc3b4e19c625ed1c7d317d9943d1092ec159..2f82e85b5c4aa6877d4c96b02bde012ce7b0f062 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,4 +1,5 @@ use tarantool::auth::AuthDef; +use tarantool::decimal::Decimal; use tarantool::error::{Error as TntError, TarantoolErrorCode as TntErrorCode}; use tarantool::fiber; use tarantool::index::{Index, IndexId, IndexIterator, IndexType, IteratorType}; @@ -15,6 +16,7 @@ use tarantool::tlua::{self, LuaError}; use tarantool::tuple::KeyDef; use tarantool::tuple::{Decode, DecodeOwned, Encode}; use tarantool::tuple::{RawBytes, ToTupleBuffer, Tuple, TupleBuffer}; +use tarantool::util::NumOrStr; use crate::access_control::{user_by_id, UserMetadataKind}; use crate::failure_domain::FailureDomain; @@ -50,6 +52,7 @@ use std::collections::{HashMap, HashSet}; use std::iter::Peekable; use std::marker::PhantomData; use std::rc::Rc; +use std::str::FromStr; use std::time::Duration; use self::acl::{on_master_drop_role, on_master_drop_user}; @@ -1347,7 +1350,7 @@ impl Properties { // Primary index id: 0, name: "key".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(true)], parts: vec![Part::from("key")], // This means the local schema is already up to date and main loop doesn't need to do anything @@ -1606,7 +1609,7 @@ impl Replicasets { // Primary index id: 0, name: "replicaset_id".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(true)], parts: vec![Part::from("replicaset_id")], // This means the local schema is already up to date and main loop doesn't need to do anything @@ -1672,7 +1675,7 @@ impl PeerAddresses { // Primary index id: 0, name: "raft_id".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(true)], parts: vec![Part::from("raft_id")], // This means the local schema is already up to date and main loop doesn't need to do anything @@ -1782,7 +1785,7 @@ impl Instances { // Primary index id: 0, name: "instance_id".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(true)], parts: vec![Part::from("instance_id")], // This means the local schema is already up to date and main loop doesn't need to do anything @@ -1794,7 +1797,7 @@ impl Instances { table_id: Self::TABLE_ID, id: 1, name: "raft_id".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(true)], parts: vec![Part::from("raft_id")], // This means the local schema is already up to date and main loop doesn't need to do anything @@ -1806,7 +1809,7 @@ impl Instances { table_id: Self::TABLE_ID, id: 2, name: "replicaset_id".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(false)], parts: vec![Part::from("replicaset_id")], // This means the local schema is already up to date and main loop doesn't need to do anything @@ -2154,7 +2157,7 @@ impl Tables { // Primary index id: 0, name: "id".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(true)], parts: vec![Part::from("id")], // This means the local schema is already up to date and main loop doesn't need to do anything @@ -2166,7 +2169,7 @@ impl Tables { table_id: Self::TABLE_ID, id: 1, name: "name".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(true)], parts: vec![Part::from("name")], // This means the local schema is already up to date and main loop doesn't need to do anything @@ -2273,7 +2276,7 @@ impl Indexes { // Primary index id: 0, name: "id".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(true)], parts: vec![Part::from("table_id"), Part::from("id")], // This means the local schema is already up to date and main loop doesn't need to do anything @@ -2285,7 +2288,7 @@ impl Indexes { table_id: Self::TABLE_ID, id: 1, name: "name".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(true)], parts: vec![Part::from("table_id"), Part::from("name")], // This means the local schema is already up to date and main loop doesn't need to do anything @@ -2474,25 +2477,35 @@ pub fn ddl_create_index_on_master( index_id: IndexId, ) -> traft::Result<()> { debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); - let pico_index_def = storage + let mut pico_index_def = storage .indexes .get(space_id, index_id)? .ok_or_else(|| Error::other(format!("index with id {index_id} not found")))?; - let mut opts = IndexOptions::default(); - opts.r#type = Some(pico_index_def.itype); - opts.id = Some(pico_index_def.id); + let mut opts = IndexOptions { + r#type: Some(pico_index_def.ty), + id: Some(pico_index_def.id), + ..Default::default() + }; + // We use LUA to create indexes, so we need to switch from C 0-based to LUA 1-based + // part enumeration in options. + for part in pico_index_def.parts.iter_mut() { + let NumOrStr::Num(ref mut num) = part.field else { + unreachable!("index part field should be a number"); + }; + *num += 1; + } opts.parts = Some(pico_index_def.parts); + let dec_to_f32 = |d: Decimal| f32::from_str(&d.to_string()).expect("decimal to f32"); for opt in pico_index_def.opts { match opt { IndexOption::Unique(unique) => opts.unique = Some(unique), - IndexOption::IfNotExists(if_not_exists) => opts.if_not_exists = Some(if_not_exists), IndexOption::Dimension(dim) => opts.dimension = Some(dim), IndexOption::Distance(distance) => opts.distance = Some(distance), - IndexOption::BloomFalsePositiveRate(rate) => opts.bloom_fpr = Some(rate as f32), + IndexOption::BloomFalsePositiveRate(rate) => opts.bloom_fpr = Some(dec_to_f32(rate)), IndexOption::PageSize(size) => opts.page_size = Some(size), IndexOption::RangeSize(size) => opts.range_size = Some(size), IndexOption::RunCountPerLevel(count) => opts.run_count_per_level = Some(count), - IndexOption::RunSizeRatio(ratio) => opts.run_size_ratio = Some(ratio as f32), + IndexOption::RunSizeRatio(ratio) => opts.run_size_ratio = Some(dec_to_f32(ratio)), IndexOption::Hint(_) => { // FIXME: `hint` option is disabled in Tarantool module. } @@ -2768,7 +2781,7 @@ impl Users { // Primary index id: 0, name: "id".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(true)], parts: vec![Part::from("id")], // This means the local schema is already up to date and main loop doesn't need to do anything @@ -2780,7 +2793,7 @@ impl Users { table_id: Self::TABLE_ID, id: 1, name: "name".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(true)], parts: vec![Part::from("name")], // This means the local schema is already up to date and main loop doesn't need to do anything @@ -2903,7 +2916,7 @@ impl Privileges { // Primary index id: 0, name: "primary".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(true)], parts: vec![ Part::from("grantee_id"), @@ -2920,7 +2933,7 @@ impl Privileges { table_id: Self::TABLE_ID, id: 1, name: "object".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(false)], parts: vec![Part::from("object_type"), Part::from("object_id")], // This means the local schema is already up to date and main loop doesn't need to do anything @@ -3092,7 +3105,7 @@ impl Tiers { // Primary index id: 0, name: "name".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(true)], parts: vec![Part::from("name")], // This means the local schema is already up to date and main loop doesn't need to do anything @@ -3170,7 +3183,7 @@ impl Routines { // Primary index id: 0, name: "id".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(true)], parts: vec![Part::from("id")], // This means the local schema is already up to date and main loop doesn't need to do anything @@ -3182,7 +3195,7 @@ impl Routines { table_id: Self::TABLE_ID, id: 1, name: "name".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(true)], parts: vec![Part::from("name")], // This means the local schema is already up to date and main loop doesn't need to do anything @@ -4533,8 +4546,6 @@ mod tests { #[track_caller] fn get_field_index(part: &Part, space_fields: &[tarantool::space::Field]) -> usize { - use tarantool::util::NumOrStr; - match &part.field { NumOrStr::Num(index) => { return *index as _; diff --git a/src/traft/node.rs b/src/traft/node.rs index 704c938151c93c8997110fabe6e660db872fdd52..3a6332781e7fcded5575916c3801b58e8e4757a6 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -1614,7 +1614,7 @@ impl NodeImpl { table_id: id, id: 0, name: "primary_key".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(true)], parts: primary_key, operable: false, @@ -1651,7 +1651,7 @@ impl NodeImpl { table_id: id, id: 1, name: "bucket_id".into(), - itype: IndexType::Tree, + ty: IndexType::Tree, opts: vec![IndexOption::Unique(false)], parts: vec![Part::field(bucket_id_index) .field_type(IFT::Unsigned) @@ -1693,7 +1693,7 @@ impl NodeImpl { space_id, index_id, name, - itype, + ty, opts, by_fields, owner, @@ -1702,7 +1702,7 @@ impl NodeImpl { table_id: space_id, id: index_id, name, - itype, + ty, opts, parts: by_fields, operable: false, diff --git a/src/traft/op.rs b/src/traft/op.rs index 919ec062609a4e524cc93e5cd3cc4199d42163c9..840bfd4e4612f27e0b96ca5ac2c1a7b4601a092f 100644 --- a/src/traft/op.rs +++ b/src/traft/op.rs @@ -670,7 +670,7 @@ pub enum Ddl { space_id: SpaceId, index_id: IndexId, name: String, - itype: IndexType, + ty: IndexType, opts: Vec<IndexOption>, by_fields: Vec<Part>, owner: UserId, diff --git a/test/conftest.py b/test/conftest.py index 95cd134a18264b2adb03d673214b90ac6b107e83..a9c35f706c4b976ddf1ed106875b48da4c7c0168 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -47,6 +47,7 @@ BASE_PORT = 3300 PORT_RANGE = 200 MAX_LOGIN_ATTEMPTS = 4 +PICO_SERVICE_ID = 32 def eprint(*args, **kwargs): diff --git a/test/int/test_audit.py b/test/int/test_audit.py index c22b21797c16790c1dda0193ca3d61c45244d1b2..c2198a7976515d0c9a4ba00576bc261a2d7fb120 100644 --- a/test/int/test_audit.py +++ b/test/int/test_audit.py @@ -323,6 +323,27 @@ def test_role(instance: Instance): assert drop_role["initiator"] == "bubba" +def test_index(instance: Instance): + instance.start() + instance.sql( + """ + create table "foo" ("val" int not null, primary key ("val")) + distributed by ("val") + """ + ) + instance.sql(""" create index "foo_idx" on "foo" ("val") """) + instance.terminate() + + events = AuditFile(instance.audit_flag_value).events() + + create_index = take_until_title(events, "create_index") + assert create_index is not None + assert create_index["name"] == "foo_idx" + assert create_index["message"] == f"created index `{create_index['name']}`" + assert create_index["severity"] == "medium" + assert create_index["initiator"] == "pico_service" + + def assert_instance_expelled(expelled_instance: Instance, instance: Instance): info = instance.call(".proc_instance_info", expelled_instance.instance_id) grades = (info["current_grade"]["variant"], info["target_grade"]["variant"]) diff --git a/test/int/test_basics.py b/test/int/test_basics.py index 2639d32b2569dcdf882b5d4dbd368dfb3b27e5ba..d592dbabd196906efcf5c4a9b330a3bfafb4b148 100644 --- a/test/int/test_basics.py +++ b/test/int/test_basics.py @@ -426,7 +426,7 @@ Insert({_pico_user}, [31,"super",0,null,1,"role"]))| Insert({_pico_table}, [{_pico_table},"_pico_table",["global"],[["id","unsigned",false],["name","string",false],["distribution","array",false],["format","array",false],["schema_version","unsigned",false],["operable","boolean",false],["engine","string",false],["owner","unsigned",false]],0,true,"memtx",1]), Insert({_pico_index}, [{_pico_table},0,"id","tree",[{{"unique":true}}],[["id",null,null,null,null]],true,0,1]), Insert({_pico_index}, [{_pico_table},1,"name","tree",[{{"unique":true}}],[["name",null,null,null,null]],true,0,1]), -Insert({_pico_table}, [{_pico_index},"_pico_index",["global"],[["table_id","unsigned",false],["id","unsigned",false],["name","string",false],["itype","string",false],["opts","array",false],["parts","array",false],["operable","boolean",false],["schema_version","unsigned",false],["owner","unsigned",false]],0,true,"memtx",1]), +Insert({_pico_table}, [{_pico_index},"_pico_index",["global"],[["table_id","unsigned",false],["id","unsigned",false],["name","string",false],["type","string",false],["opts","array",false],["parts","array",false],["operable","boolean",false],["schema_version","unsigned",false],["owner","unsigned",false]],0,true,"memtx",1]), Insert({_pico_index}, [{_pico_index},0,"id","tree",[{{"unique":true}}],[["table_id",null,null,null,null],["id",null,null,null,null]],true,0,1]), Insert({_pico_index}, [{_pico_index},1,"name","tree",[{{"unique":true}}],[["table_id",null,null,null,null],["name",null,null,null,null]],true,0,1]), Insert({_pico_table}, [{_pico_peer_address},"_pico_peer_address",["global"],[["raft_id","unsigned",false],["address","string",false]],0,true,"memtx",1]), diff --git a/test/int/test_ddl.py b/test/int/test_ddl.py index eaa4b4214be215271fd85565e692fb7c9cff55eb..6fbaec57ae5926931a0aea0730b3c9e88abed10a 100644 --- a/test/int/test_ddl.py +++ b/test/int/test_ddl.py @@ -1,9 +1,6 @@ import pytest import time -from conftest import Cluster, ReturnError - - -PICO_SERVICE_ID = 32 +from conftest import PICO_SERVICE_ID, Cluster, ReturnError def test_ddl_abort(cluster: Cluster): @@ -83,7 +80,7 @@ def test_ddl_lua_api(cluster: Cluster): ) ) space_id = 1027 - initiator_id = 32 # pico_service + initiator_id = PICO_SERVICE_ID pico_space_def = [ space_id, "space 2", @@ -280,7 +277,7 @@ def test_ddl_create_table_bulky(cluster: Cluster): assert i4.next_schema_version() == 3 # Space was created and is operable - initiator_id = 32 # pico_service + initiator_id = PICO_SERVICE_ID pico_space_def = [ space_id, "stuff", @@ -392,7 +389,7 @@ def test_ddl_create_sharded_space(cluster: Cluster): ############################################################################ # Space was created and is operable - initiator_id = 32 # pico_service + initiator_id = PICO_SERVICE_ID pico_space_def = [ space_id, "stuff", @@ -739,7 +736,7 @@ def test_ddl_create_table_from_snapshot_at_boot(cluster: Cluster): ), ) - initiator_id = 32 # pico_service, + initiator_id = PICO_SERVICE_ID tt_space_def = [ space_id, initiator_id, @@ -823,7 +820,7 @@ def test_ddl_create_table_from_snapshot_at_catchup(cluster: Cluster): i1.raft_wait_index(index) i2.raft_wait_index(index) - initiator_id = 32 # pico_service + initiator_id = PICO_SERVICE_ID tt_space_def = [ space_id, initiator_id, diff --git a/test/int/test_sql.py b/test/int/test_sql.py index 3c49931afb40a16fdaf6ecfd8af1ab46b97ff56d..17fdbc010e1aa59dcad500597cc3f6051abb2d82 100644 --- a/test/int/test_sql.py +++ b/test/int/test_sql.py @@ -3123,3 +3123,128 @@ box error: AccessDenied: Alter access to user 'boba' is denied for user 'biba'.\ # Check that we can create a new user "boba" after rename him i1.create_user(with_name=boba, with_password=password) assert boba in names_from_pico_user_table() + + +def test_index(cluster: Cluster): + cluster.deploy(instance_count=1) + i1 = cluster.instances[0] + + # Sharded memtx table + ddl = i1.sql( + """ + create table t (a text not null, b text not null, c text, primary key (a)) + using memtx + distributed by (a) + option (timeout = 3) + """ + ) + assert ddl["row_count"] == 1 + + # Global table + ddl = i1.sql( + """ + create table g (a int not null, b text not null, c text, primary key (a)) + distributed globally + option (timeout = 3) + """ + ) + assert ddl["row_count"] == 1 + + # Sharded vinyl table + ddl = i1.sql( + """ + create table v (a int not null, b text not null, c text, primary key (a)) + using vinyl + distributed by (a) + option (timeout = 3) + """ + ) + + # Check that created index appears in _pico_index table. + ddl = i1.sql(""" create index i0 on t (a) """) + assert ddl["row_count"] == 1 + data = i1.sql(""" select * from "_pico_index" where "name" = 'I0' """) + assert data["rows"] != [] + + # Successful tree index creation with default options + ddl = i1.sql(""" create index i1 on t (a, b) """) + assert ddl["row_count"] == 1 + + # Unique index can be created only on the sharding key for sharded tables. + invalid_unique = ( + "unique index for the sharded table must duplicate its sharding key columns" + ) + with pytest.raises(ReturnError, match=invalid_unique): + i1.sql(""" create unique index i2 on t using tree (b) """) + + # Successful unique tree index creation on the sharding key. + ddl = i1.sql(""" create unique index i2 on t using tree (a) """) + assert ddl["row_count"] == 1 + + # No restrictions on unique index for globally distributed tables. + ddl = i1.sql(""" create unique index i3 on g using tree (b) """) + + # Successful create a tree index with corresponding options. + ddl = i1.sql(""" create index i4 on t using tree (c) with (hint = true) """) + assert ddl["row_count"] == 1 + + # Fail to create a tree index with wrong options. + invalid_tree_option = "index type tree does not support option" + with pytest.raises(ReturnError, match=invalid_tree_option): + i1.sql(""" create index i5 on t using tree (c) with (distance = euclid) """) + with pytest.raises(ReturnError, match=invalid_tree_option): + i1.sql(""" create index i6 on t using tree (c) with (dimension = 42) """) + + # RTree indexes can't be created via SQL at the moment as they require array columns + # that are not supported yet. + non_array_rtree = "index type rtree does not support column type" + with pytest.raises(ReturnError, match=non_array_rtree): + i1.sql(""" create index i11 on t using rtree (b) """) + + # Fail to create an rtree index from nullable columns. + nullable_rtree = "index type rtree does not support nullable columns" + with pytest.raises(ReturnError, match=nullable_rtree): + i1.sql(""" create index i12 on t using rtree (c) """) + + # Fail to create an rtree index with wrong options. + invalid_rtree_option = "index type rtree does not support option" + with pytest.raises(ReturnError, match=invalid_rtree_option): + i1.sql(""" create index i13 on t using rtree (b) with (hint = true) """) + + # Successful bitset index creation. + ddl = i1.sql(""" create index i14 on t using bitset (b) """) + + # Fail to create a bitset index from nullable columns. + nullable_bitset = "index type bitset does not support nullable columns" + with pytest.raises(ReturnError, match=nullable_bitset): + i1.sql(""" create index i15 on t using bitset (c) """) + + # Fail to create unique bitset index. + unique_bitset = "index type bitset does not support unique indexes" + with pytest.raises(ReturnError, match=unique_bitset): + i1.sql(""" create unique index i16 on t using bitset (a) """) + + # Fail to create bitset index with column types other then string, number or varbinary. + invalid_bitset = "index type bitset does not support column type" + with pytest.raises(ReturnError, match=invalid_bitset): + i1.sql(""" create index i17 on v using bitset (a) """) + + # Successful hash index creation. + ddl = i1.sql(""" create unique index i17 on t using hash (a) """) + assert ddl["row_count"] == 1 + + # Fail to create a non-unique hash index. + non_unique_hash = "index type hash does not support non-unique indexes" + with pytest.raises(ReturnError, match=non_unique_hash): + i1.sql(""" create index i18 on t using hash (c) """) + + # Fail to create an index on memtex table with vinyl options. + invalid_memtx = "table engine memtx does not support option" + with pytest.raises(ReturnError, match=invalid_memtx): + i1.sql(""" create index i7 on t (b) with (page_size = 42) """) + with pytest.raises(ReturnError, match=invalid_memtx): + i1.sql(""" create index i8 on t (b) with (range_size = 42) """) + with pytest.raises(ReturnError, match=invalid_memtx): + i1.sql(""" create index i9 on t (b) with (run_count_per_level = 42) """) + with pytest.raises(ReturnError, match=invalid_memtx): + i1.sql(""" create index i10 on t (b) with (run_size_ratio = 0.1) """)