From 6043629fc06b8e5953ca16318adfcaa5f273d6c9 Mon Sep 17 00:00:00 2001
From: Denis Smirnov <sd@picodata.io>
Date: Wed, 27 Mar 2024 17:52:48 +0700
Subject: [PATCH] feat: implement create index

---
 CHANGELOG.md            |   4 -
 src/cas.rs              |   2 +-
 src/schema.rs           | 421 ++++++++++++++++++++++++++++++++++++++--
 src/sql.rs              |  74 ++++++-
 src/sql/pgproto.rs      |   3 +
 src/storage.rs          |  63 +++---
 src/traft/node.rs       |   8 +-
 src/traft/op.rs         |   2 +-
 test/conftest.py        |   1 +
 test/int/test_audit.py  |  21 ++
 test/int/test_basics.py |   2 +-
 test/int/test_ddl.py    |  15 +-
 test/int/test_sql.py    | 125 ++++++++++++
 13 files changed, 673 insertions(+), 68 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index e2aa49f732..80867aa0b8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -33,11 +33,7 @@ with the `YY.MINOR.MICRO` scheme.
     - `instance.log_level` -> `instance.log.level`
     - `instance.memtx_memory` -> `instance.memtx.memory`
 
-=======
 - Change _pico_index table structure.
-
-### SQL
-
 - Support index creation with SQL.
 -->
 
diff --git a/src/cas.rs b/src/cas.rs
index d5c460cf5c..a0969dfdb9 100644
--- a/src/cas.rs
+++ b/src/cas.rs
@@ -954,7 +954,7 @@ mod tests {
             space_id,
             index_id,
             name: "index1".into(),
-            itype: IndexType::Tree,
+            ty: IndexType::Tree,
             opts: vec![IndexOption::Unique(true)],
             by_fields: vec![],
             owner: ADMIN_ID,
diff --git a/src/schema.rs b/src/schema.rs
index 8296c4238e..e7caff5d07 100644
--- a/src/schema.rs
+++ b/src/schema.rs
@@ -1,9 +1,12 @@
+use ahash::{AHashMap, AHashSet};
 use sbroad::ir::ddl::{Language, ParamDef};
 use std::borrow::Cow;
 use std::collections::{BTreeMap, HashSet};
 use std::fmt::Display;
+use std::str::FromStr;
 use std::time::Duration;
-use tarantool::index::{IndexType, RtreeIndexDistanceType};
+use tarantool::decimal::Decimal;
+use tarantool::index::{FieldType as IndexFieldType, IndexType, Part, RtreeIndexDistanceType};
 
 use tarantool::auth::AuthData;
 use tarantool::auth::AuthDef;
@@ -17,13 +20,13 @@ use tarantool::space::{FieldType, SpaceCreateOptions, SpaceEngineType};
 use tarantool::space::{Metadata as SpaceMetadata, Space, SpaceType, SystemSpace};
 use tarantool::transaction::{transaction, TransactionError};
 use tarantool::{
+    index::IndexId,
     index::IteratorType,
     index::Metadata as IndexMetadata,
-    index::{IndexId, Part},
     space::SpaceId,
     tlua::{self, LuaRead},
     tuple::Encode,
-    util::Value,
+    util::{NumOrStr, Value},
 };
 
 use sbroad::ir::value::Value as IrValue;
@@ -217,42 +220,102 @@ fn default_bucket_id_field() -> String {
 // IndexDef
 ////////////////////////////////////////////////////////////////////////////////
 
-#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
+/// Picodata index options.
+///
+/// While tarantool module already includes an `IndexOptions` structure
+/// for local indexes, the current enum is designed for picodata ones.
+/// Currently, these options coincide, but we intend to introduce a
+/// REDISTRIBUTION option for secondary indexes. It will be implemented
+/// using materialized views instead of tarantool indexes. Therefore,
+/// it's important to maintain a distinction between these features,
+/// emphasizing that it is specific to picodata.
+#[derive(Clone, Debug, Deserialize, Serialize, Hash, PartialEq, Eq)]
 pub enum IndexOption {
-    BloomFalsePositiveRate(f64),
+    /// Vinyl only. The false positive rate for the bloom filter.
+    #[serde(rename = "bloom_fpr")]
+    BloomFalsePositiveRate(Decimal),
+    /// The RTREE index dimension.
+    #[serde(rename = "dimension")]
     Dimension(u32),
+    /// The RTREE index distance type.
+    #[serde(rename = "distance")]
     Distance(RtreeIndexDistanceType),
+    /// Specify whether hint optimization is enabled for the TREE index.
+    /// If true, the index works faster, if false, the index size is reduced by half.
+    #[serde(rename = "hint")]
+    // FIXME: this option is disabled in the current version of the module.
     Hint(bool),
-    IfNotExists(bool),
+    /// Vinyl only. The page size in bytes used for read and write disk operations.
+    #[serde(rename = "page_size")]
     PageSize(u32),
+    /// Vinyl only. The range size in bytes used for vinyl index.
+    #[serde(rename = "range_size")]
     RangeSize(u32),
+    /// Vinyl only. The number of runs per level in the LSM tree.
+    #[serde(rename = "run_count_per_level")]
     RunCountPerLevel(u32),
-    RunSizeRatio(f64),
+    /// Vinyl only. The ratio between the size of different levels in the LSM tree.
+    #[serde(rename = "run_size_ratio")]
+    RunSizeRatio(Decimal),
+    /// Specify whether the index is unique. When true, the index cannot contain duplicate values.
+    #[serde(rename = "unique")]
     Unique(bool),
 }
 
-// We can safely derive `Eq` because `f64` is never `NaN`.
-impl Eq for IndexOption {}
-
 impl IndexOption {
     pub fn as_kv(&self) -> (Cow<'static, str>, Value<'_>) {
+        let dec_to_f64 = |d: Decimal| f64::from_str(&d.to_string()).expect("decimal to f64");
         match self {
-            IndexOption::BloomFalsePositiveRate(rate) => ("bloom_fpr".into(), Value::Double(*rate)),
+            IndexOption::BloomFalsePositiveRate(rate) => {
+                ("bloom_fpr".into(), Value::Double(dec_to_f64(*rate)))
+            }
             IndexOption::Dimension(dim) => ("dimension".into(), Value::Num(*dim)),
             IndexOption::Distance(dist) => ("distance".into(), Value::Str(dist.as_str().into())),
             IndexOption::Hint(hint) => ("hint".into(), Value::Bool(*hint)),
-            IndexOption::IfNotExists(if_not_exists) => {
-                ("if_not_exists".into(), Value::Bool(*if_not_exists))
-            }
             IndexOption::PageSize(size) => ("page_size".into(), Value::Num(*size)),
             IndexOption::RangeSize(size) => ("range_size".into(), Value::Num(*size)),
             IndexOption::RunCountPerLevel(count) => {
                 ("run_count_per_level".into(), Value::Num(*count))
             }
-            IndexOption::RunSizeRatio(ratio) => ("run_size_ratio".into(), Value::Double(*ratio)),
+            IndexOption::RunSizeRatio(ratio) => {
+                ("run_size_ratio".into(), Value::Double(dec_to_f64(*ratio)))
+            }
             IndexOption::Unique(unique) => ("unique".into(), Value::Bool(*unique)),
         }
     }
+
+    pub fn is_vinyl(&self) -> bool {
+        matches!(
+            self,
+            IndexOption::BloomFalsePositiveRate(_)
+                | IndexOption::PageSize(_)
+                | IndexOption::RangeSize(_)
+                | IndexOption::RunCountPerLevel(_)
+                | IndexOption::RunSizeRatio(_)
+        )
+    }
+
+    pub fn is_rtree(&self) -> bool {
+        matches!(self, IndexOption::Dimension(_) | IndexOption::Distance(_))
+    }
+
+    pub fn is_tree(&self) -> bool {
+        matches!(self, IndexOption::Hint(_))
+    }
+
+    pub fn type_name(&self) -> &str {
+        match self {
+            IndexOption::BloomFalsePositiveRate(_) => "bloom_fpr",
+            IndexOption::Dimension(_) => "dimension",
+            IndexOption::Distance(_) => "distance",
+            IndexOption::Hint(_) => "hint",
+            IndexOption::PageSize(_) => "page_size",
+            IndexOption::RangeSize(_) => "range_size",
+            IndexOption::RunCountPerLevel(_) => "run_count_per_level",
+            IndexOption::RunSizeRatio(_) => "run_size_ratio",
+            IndexOption::Unique(_) => "unique",
+        }
+    }
 }
 
 /// Database index definition.
@@ -263,7 +326,8 @@ pub struct IndexDef {
     pub table_id: SpaceId,
     pub id: IndexId,
     pub name: String,
-    pub itype: IndexType,
+    #[serde(rename = "type")]
+    pub ty: IndexType,
     pub opts: Vec<IndexOption>,
     pub parts: Vec<Part>,
     pub operable: bool,
@@ -285,7 +349,7 @@ impl IndexDef {
             Field::from(("table_id", FieldType::Unsigned)),
             Field::from(("id", FieldType::Unsigned)),
             Field::from(("name", FieldType::String)),
-            Field::from(("itype", FieldType::String)),
+            Field::from(("type", FieldType::String)),
             Field::from(("opts", FieldType::Array)),
             Field::from(("parts", FieldType::Array)),
             Field::from(("operable", FieldType::Boolean)),
@@ -301,7 +365,7 @@ impl IndexDef {
             table_id: 10569,
             id: 1,
             name: "secondary".into(),
-            itype: IndexType::Tree,
+            ty: IndexType::Tree,
             opts: vec![],
             parts: vec![],
             operable: true,
@@ -320,13 +384,26 @@ impl IndexDef {
             space_id: self.table_id,
             index_id: self.id,
             name: self.name.as_str().into(),
-            r#type: self.itype,
+            r#type: self.ty,
             opts,
             parts: self.parts.clone(),
         };
 
         index_meta
     }
+
+    pub fn opts_equal(&self, opts: &[IndexOption]) -> bool {
+        let mut set: AHashSet<&IndexOption> = AHashSet::with_capacity(opts.len());
+        for opt in &self.opts {
+            set.insert(opt);
+        }
+        for opt in opts {
+            if !set.contains(opt) {
+                return false;
+            }
+        }
+        true
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1321,6 +1398,8 @@ pub enum DdlError {
     NoPendingDdl,
     #[error("{0}")]
     CreateRoutine(#[from] CreateRoutineError),
+    #[error("{0}")]
+    CreateIndex(#[from] CreateIndexError),
 }
 
 #[derive(Debug, thiserror::Error)]
@@ -1349,6 +1428,40 @@ impl From<CreateTableError> for Error {
     }
 }
 
+#[derive(Debug, thiserror::Error)]
+pub enum CreateIndexError {
+    #[error("index {name} already exists with a different signature")]
+    IndexAlreadyExists { name: String },
+    #[error("table {table_name} not found")]
+    TableNotFound { table_name: String },
+    #[error("table {table_name} is not operable")]
+    TableNotOperable { table_name: String },
+    #[error("no field with name: {name}")]
+    FieldUndefined { name: String },
+    #[error("table engine {engine} does not support option {option}")]
+    IncompatibleTableEngineOption { engine: String, option: String },
+    #[error("index type {ty} does not support option {option}")]
+    IncompatibleIndexTypeOption { ty: String, option: String },
+    #[error("index type {ty} does not support column type {ctype}")]
+    IncompatibleIndexColumnType { ty: String, ctype: String },
+    #[error("index type {ty} does not support nullable columns")]
+    IncompatipleNullableColumn { ty: String },
+    #[error("unique index for the sharded table must duplicate its sharding key columns")]
+    IncompatibleUniqueIndexColumns,
+    #[error("index type {ty} does not support unique indexes")]
+    UniqueIndexType { ty: String },
+    #[error("index type {ty} does not support non-unique indexes")]
+    NonUniqueIndexType { ty: String },
+    #[error("index type {ty} does not support multiple columns")]
+    IncompatibleIndexMultipleColumns { ty: String },
+}
+
+impl From<CreateIndexError> for Error {
+    fn from(err: CreateIndexError) -> Self {
+        DdlError::CreateIndex(err).into()
+    }
+}
+
 #[derive(Debug, thiserror::Error)]
 pub enum CreateRoutineError {
     #[error("routine {name} already exists with a different kind")]
@@ -1468,6 +1581,276 @@ impl CreateProcParams {
     }
 }
 
+#[derive(Clone, Debug)]
+pub struct CreateIndexParams {
+    pub(crate) name: String,
+    pub(crate) space_name: String,
+    pub(crate) columns: Vec<String>,
+    pub(crate) ty: IndexType,
+    pub(crate) opts: Vec<IndexOption>,
+    pub(crate) owner: UserId,
+}
+
+impl CreateIndexParams {
+    pub fn index_exists(&self) -> bool {
+        let sys_index = Space::from(SystemSpace::Index);
+        let name_idx = sys_index
+            .index_cached("name")
+            .expect("_index should have an index by table_id and name");
+        let Some(space) = Space::find_cached(&self.space_name) else {
+            return false;
+        };
+        let idx = name_idx
+            .get(&(&space.id(), &self.name))
+            .expect("reading from _index shouldn't fail");
+        idx.is_some()
+    }
+
+    pub fn table(&self, storage: &Clusterwide) -> traft::Result<TableDef> {
+        let table = with_su(ADMIN_ID, || storage.tables.by_name(&self.space_name))??;
+        let Some(table) = table else {
+            return Err(CreateIndexError::TableNotFound {
+                table_name: self.space_name.clone(),
+            })?;
+        };
+        Ok(table)
+    }
+
+    pub fn next_index_id(&self) -> traft::Result<IndexId> {
+        let Some(space) = Space::find_cached(&self.space_name) else {
+            return Err(CreateIndexError::TableNotFound {
+                table_name: self.space_name.clone(),
+            })?;
+        };
+        // The logic is taken from the box.schema.index.create function
+        // (box/lua/schema.lua).
+        let sys_vindex = Space::from(SystemSpace::VIndex);
+        let mut iter = sys_vindex
+            .primary_key()
+            .select(IteratorType::LE, &[space.id()])?;
+        let Some(tuple) = iter.next() else {
+            panic!("no primary key found for space {}", self.space_name);
+        };
+        let id: IndexId = tuple.get(1).expect("index id should be present");
+        Ok(id + 1)
+    }
+
+    pub fn parts(&self, storage: &Clusterwide) -> traft::Result<Vec<Part>> {
+        let table = self.table(storage)?;
+        let mut parts = Vec::with_capacity(self.columns.len());
+
+        type Position = u32;
+        let mut positions: AHashMap<&str, Position> = AHashMap::with_capacity(table.format.len());
+        for (i, field) in table.format.iter().enumerate() {
+            positions.insert(field.name.as_str(), i as Position);
+        }
+        for column_name in &self.columns {
+            let position = positions.get(column_name.as_str()).cloned();
+            let position = position.ok_or_else(|| CreateIndexError::FieldUndefined {
+                name: column_name.clone(),
+            })?;
+            let column = &table.format[position as usize];
+            let index_field_type = try_space_field_type_to_index_field_type(column.field_type)
+                .ok_or_else(|| CreateIndexError::IncompatibleIndexColumnType {
+                    ty: self.ty.to_string(),
+                    ctype: column.field_type.to_string(),
+                })?;
+            let part = Part {
+                field: NumOrStr::Num(position),
+                r#type: Some(index_field_type),
+                collation: None,
+                is_nullable: Some(column.is_nullable),
+                path: None,
+            };
+            parts.push(part);
+        }
+        Ok(parts)
+    }
+
+    fn is_duplicate(&self, table_id: SpaceId, index: IndexDef) -> bool {
+        table_id == index.table_id
+            && self.name == index.name
+            && self.ty == index.ty
+            && index.opts_equal(self.opts.as_slice())
+    }
+
+    pub fn into_ddl(&self, storage: &Clusterwide) -> Result<Ddl, Error> {
+        let table = self.table(storage)?;
+        let by_fields = self.parts(storage)?;
+        let index_id = self.next_index_id()?;
+        let ddl = Ddl::CreateIndex {
+            space_id: table.id,
+            index_id,
+            name: self.name.clone(),
+            ty: self.ty,
+            by_fields,
+            opts: self.opts.clone(),
+            owner: self.owner,
+        };
+        Ok(ddl)
+    }
+
+    pub fn validate(&self, storage: &Clusterwide) -> traft::Result<()> {
+        let table = self.table(storage)?;
+        if !table.operable {
+            return Err(CreateIndexError::TableNotOperable {
+                table_name: self.space_name.clone(),
+            })?;
+        }
+        let mut filed_names: AHashSet<&str> = AHashSet::with_capacity(table.format.len());
+        for field in &table.format {
+            filed_names.insert(field.name.as_str());
+        }
+        for column in &self.columns {
+            if !filed_names.contains(column.as_str()) {
+                return Err(CreateIndexError::FieldUndefined {
+                    name: column.clone(),
+                })?;
+            }
+        }
+
+        for opt in &self.opts {
+            if table.engine != SpaceEngineType::Vinyl && opt.is_vinyl() {
+                return Err(CreateIndexError::IncompatibleTableEngineOption {
+                    engine: table.engine.to_string(),
+                    option: opt.type_name().into(),
+                })?;
+            }
+            if self.ty != IndexType::Rtree && opt.is_rtree() {
+                return Err(CreateIndexError::IncompatibleIndexTypeOption {
+                    ty: self.ty.to_string(),
+                    option: opt.type_name().into(),
+                })?;
+            }
+            if self.ty != IndexType::Tree && opt.is_tree() {
+                return Err(CreateIndexError::IncompatibleIndexTypeOption {
+                    ty: self.ty.to_string(),
+                    option: opt.type_name().into(),
+                })?;
+            }
+            if let &IndexOption::Unique(false) = opt {
+                if self.ty == IndexType::Hash {
+                    return Err(CreateIndexError::NonUniqueIndexType {
+                        ty: self.ty.to_string(),
+                    })?;
+                }
+            }
+            if let &IndexOption::Unique(true) = opt {
+                if self.ty == IndexType::Rtree || self.ty == IndexType::Bitset {
+                    return Err(CreateIndexError::UniqueIndexType {
+                        ty: self.ty.to_string(),
+                    })?;
+                }
+                // Unique index for the sharded table must duplicate its sharding key columns.
+                if let Distribution::ShardedImplicitly { sharding_key, .. } = &table.distribution {
+                    if sharding_key.len() != self.columns.len() {
+                        return Err(CreateIndexError::IncompatibleUniqueIndexColumns)?;
+                    }
+                    for (sharding_key, column) in sharding_key.iter().zip(&self.columns) {
+                        if sharding_key != column {
+                            return Err(CreateIndexError::IncompatibleUniqueIndexColumns)?;
+                        }
+                    }
+                }
+            }
+        }
+
+        let check_multipart = |parts: &[Part]| -> Result<(), CreateIndexError> {
+            if parts.len() > 1 {
+                return Err(CreateIndexError::IncompatibleIndexMultipleColumns {
+                    ty: self.ty.to_string(),
+                })?;
+            }
+            Ok(())
+        };
+        let check_part_nullability = |part: &Part| -> Result<(), CreateIndexError> {
+            if part.is_nullable == Some(true) {
+                return Err(CreateIndexError::IncompatipleNullableColumn {
+                    ty: self.ty.to_string(),
+                })?;
+            }
+            Ok(())
+        };
+        let check_part_type = |part: &Part,
+                               eq: bool,
+                               types: &[Option<IndexFieldType>]|
+         -> Result<(), CreateIndexError> {
+            let ctype = part
+                .r#type
+                .as_ref()
+                .map(|t| t.to_string())
+                .unwrap_or_else(|| "unknown".into());
+            if eq {
+                if !types.iter().any(|t| t == &part.r#type) {
+                    return Err(CreateIndexError::IncompatibleIndexColumnType {
+                        ty: self.ty.to_string(),
+                        ctype,
+                    })?;
+                }
+            } else if types.iter().any(|t| t == &part.r#type) {
+                return Err(CreateIndexError::IncompatibleIndexColumnType {
+                    ty: self.ty.to_string(),
+                    ctype,
+                })?;
+            }
+            Ok(())
+        };
+        let parts = self.parts(storage)?;
+        match self.ty {
+            IndexType::Bitset => {
+                check_multipart(&parts)?;
+                for part in &parts {
+                    check_part_nullability(part)?;
+                    check_part_type(
+                        part,
+                        true,
+                        &[
+                            Some(IndexFieldType::Unsigned),
+                            Some(IndexFieldType::String),
+                            Some(IndexFieldType::Varbinary),
+                        ],
+                    )?;
+                }
+            }
+            IndexType::Rtree => {
+                check_multipart(&parts)?;
+                for part in &parts {
+                    check_part_nullability(part)?;
+                    check_part_type(part, true, &[Some(IndexFieldType::Array)])?;
+                }
+            }
+            IndexType::Hash => {
+                for part in &parts {
+                    check_part_nullability(part)?;
+                    check_part_type(
+                        part,
+                        false,
+                        &[Some(IndexFieldType::Array), Some(IndexFieldType::Datetime)],
+                    )?;
+                }
+            }
+            IndexType::Tree => {
+                for part in &parts {
+                    check_part_type(part, false, &[Some(IndexFieldType::Array)])?;
+                }
+            }
+        }
+
+        let index = with_su(ADMIN_ID, || {
+            storage.indexes.by_name(table.id, self.name.clone())
+        })??;
+        if let Some(index) = index {
+            if self.is_duplicate(table.id, index) {
+                return Err(CreateIndexError::IndexAlreadyExists {
+                    name: self.name.clone(),
+                })?;
+            }
+        }
+
+        Ok(())
+    }
+}
+
 #[derive(Clone, Debug, LuaRead)]
 pub struct CreateTableParams {
     pub(crate) id: Option<SpaceId>,
diff --git a/src/sql.rs b/src/sql.rs
index c6f28d8ae7..031fb2f618 100644
--- a/src/sql.rs
+++ b/src/sql.rs
@@ -2,9 +2,10 @@
 
 use crate::access_control::UserMetadataKind;
 use crate::schema::{
-    wait_for_ddl_commit, CreateProcParams, CreateTableParams, DistributionParam, Field,
-    PrivilegeDef, PrivilegeType, RenameRoutineParams, RoutineDef, RoutineLanguage, RoutineParamDef,
-    RoutineParams, RoutineSecurity, SchemaObjectType, ShardingFn, UserDef, ADMIN_ID,
+    wait_for_ddl_commit, CreateIndexParams, CreateProcParams, CreateTableParams, DistributionParam,
+    Field, IndexOption, PrivilegeDef, PrivilegeType, RenameRoutineParams, RoutineDef,
+    RoutineLanguage, RoutineParamDef, RoutineParams, RoutineSecurity, SchemaObjectType, ShardingFn,
+    UserDef, ADMIN_ID,
 };
 use crate::sql::pgproto::{
     with_portals_mut, Portal, PortalDescribe, Statement, StatementDescribe, UserPortalNames,
@@ -899,6 +900,61 @@ fn reenterable_schema_change_request(
 
     // Check parameters
     let params = match ir_node {
+        IrNode::Ddl(Ddl::CreateIndex {
+            name,
+            table_name,
+            columns,
+            unique,
+            index_type,
+            bloom_fpr,
+            page_size,
+            range_size,
+            run_count_per_level,
+            run_size_ratio,
+            dimension,
+            distance,
+            hint,
+            ..
+        }) => {
+            let mut opts: Vec<IndexOption> = Vec::with_capacity(9);
+            opts.push(IndexOption::Unique(unique));
+            if let Some(bloom_fpr) = bloom_fpr {
+                opts.push(IndexOption::BloomFalsePositiveRate(bloom_fpr));
+            }
+            if let Some(page_size) = page_size {
+                opts.push(IndexOption::PageSize(page_size));
+            }
+            if let Some(range_size) = range_size {
+                opts.push(IndexOption::RangeSize(range_size));
+            }
+            if let Some(run_count_per_level) = run_count_per_level {
+                opts.push(IndexOption::RunCountPerLevel(run_count_per_level));
+            }
+            if let Some(run_size_ratio) = run_size_ratio {
+                opts.push(IndexOption::RunSizeRatio(run_size_ratio));
+            }
+            if let Some(dimension) = dimension {
+                opts.push(IndexOption::Dimension(dimension));
+            }
+            if let Some(distance) = distance {
+                opts.push(IndexOption::Distance(distance));
+            }
+            if let Some(hint) = hint {
+                opts.push(IndexOption::Hint(hint));
+            }
+            opts.shrink_to_fit();
+
+            let params = CreateIndexParams {
+                name,
+                space_name: table_name,
+                columns,
+                ty: index_type,
+                opts,
+                owner: current_user,
+            };
+            params.validate(storage)?;
+            Params::CreateIndex(params)
+        }
         IrNode::Ddl(Ddl::CreateProc {
             name,
             params: args,
@@ -1072,6 +1128,17 @@ fn reenterable_schema_change_request(
 
         // Check for conflicts and make the op
         let op = match &params {
+            Params::CreateIndex(params) => {
+                if params.index_exists() {
+                    // Index already exists, no op needed.
+                    return Ok(ConsumerResult { row_count: 0 });
+                }
+                let ddl = params.into_ddl(storage)?;
+                Op::DdlPrepare {
+                    schema_version,
+                    ddl,
+                }
+            }
             Params::CreateProcedure(params) => {
                 if params.func_exists() {
                     // Function already exists, no op needed.
@@ -1477,6 +1544,7 @@ fn reenterable_schema_change_request(
         RenameRoutine(RenameRoutineParams),
         CreateProcedure(CreateProcParams),
         DropProcedure(String, Option<Vec<ParamDef>>),
+        CreateIndex(CreateIndexParams),
     }
 }
 
diff --git a/src/sql/pgproto.rs b/src/sql/pgproto.rs
index d5d20325a5..330c77c722 100644
--- a/src/sql/pgproto.rs
+++ b/src/sql/pgproto.rs
@@ -583,6 +583,7 @@ pub enum QueryType {
 pub enum CommandTag {
     AlterRole = 0,
     CallProcedure = 16,
+    CreateIndex = 18,
     CreateProcedure = 14,
     CreateRole = 1,
     CreateTable = 2,
@@ -614,6 +615,7 @@ impl From<CommandTag> for QueryType {
             | CommandTag::RevokeRole => QueryType::Acl,
             CommandTag::DropTable
             | CommandTag::CreateTable
+            | CommandTag::CreateIndex
             | CommandTag::CreateProcedure
             | CommandTag::RenameRoutine
             | CommandTag::DropProcedure => QueryType::Ddl,
@@ -654,6 +656,7 @@ impl TryFrom<&Node> for CommandTag {
                 Ddl::CreateProc { .. } => Ok(CommandTag::CreateProcedure),
                 Ddl::DropProc { .. } => Ok(CommandTag::DropProcedure),
                 Ddl::RenameRoutine { .. } => Ok(CommandTag::RenameRoutine),
+                Ddl::CreateIndex { .. } => Ok(CommandTag::CreateIndex),
             },
             Node::Relational(rel) => match rel {
                 Relational::Delete { .. } => Ok(CommandTag::Delete),
diff --git a/src/storage.rs b/src/storage.rs
index b8aafc3b4e..2f82e85b5c 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -1,4 +1,5 @@
 use tarantool::auth::AuthDef;
+use tarantool::decimal::Decimal;
 use tarantool::error::{Error as TntError, TarantoolErrorCode as TntErrorCode};
 use tarantool::fiber;
 use tarantool::index::{Index, IndexId, IndexIterator, IndexType, IteratorType};
@@ -15,6 +16,7 @@ use tarantool::tlua::{self, LuaError};
 use tarantool::tuple::KeyDef;
 use tarantool::tuple::{Decode, DecodeOwned, Encode};
 use tarantool::tuple::{RawBytes, ToTupleBuffer, Tuple, TupleBuffer};
+use tarantool::util::NumOrStr;
 
 use crate::access_control::{user_by_id, UserMetadataKind};
 use crate::failure_domain::FailureDomain;
@@ -50,6 +52,7 @@ use std::collections::{HashMap, HashSet};
 use std::iter::Peekable;
 use std::marker::PhantomData;
 use std::rc::Rc;
+use std::str::FromStr;
 use std::time::Duration;
 
 use self::acl::{on_master_drop_role, on_master_drop_user};
@@ -1347,7 +1350,7 @@ impl Properties {
             // Primary index
             id: 0,
             name: "key".into(),
-            itype: IndexType::Tree,
+            ty: IndexType::Tree,
             opts: vec![IndexOption::Unique(true)],
             parts: vec![Part::from("key")],
             // This means the local schema is already up to date and main loop doesn't need to do anything
@@ -1606,7 +1609,7 @@ impl Replicasets {
             // Primary index
             id: 0,
             name: "replicaset_id".into(),
-            itype: IndexType::Tree,
+            ty: IndexType::Tree,
             opts: vec![IndexOption::Unique(true)],
             parts: vec![Part::from("replicaset_id")],
             // This means the local schema is already up to date and main loop doesn't need to do anything
@@ -1672,7 +1675,7 @@ impl PeerAddresses {
             // Primary index
             id: 0,
             name: "raft_id".into(),
-            itype: IndexType::Tree,
+            ty: IndexType::Tree,
             opts: vec![IndexOption::Unique(true)],
             parts: vec![Part::from("raft_id")],
             // This means the local schema is already up to date and main loop doesn't need to do anything
@@ -1782,7 +1785,7 @@ impl Instances {
                 // Primary index
                 id: 0,
                 name: "instance_id".into(),
-                itype: IndexType::Tree,
+                ty: IndexType::Tree,
                 opts: vec![IndexOption::Unique(true)],
                 parts: vec![Part::from("instance_id")],
                 // This means the local schema is already up to date and main loop doesn't need to do anything
@@ -1794,7 +1797,7 @@ impl Instances {
                 table_id: Self::TABLE_ID,
                 id: 1,
                 name: "raft_id".into(),
-                itype: IndexType::Tree,
+                ty: IndexType::Tree,
                 opts: vec![IndexOption::Unique(true)],
                 parts: vec![Part::from("raft_id")],
                 // This means the local schema is already up to date and main loop doesn't need to do anything
@@ -1806,7 +1809,7 @@ impl Instances {
                 table_id: Self::TABLE_ID,
                 id: 2,
                 name: "replicaset_id".into(),
-                itype: IndexType::Tree,
+                ty: IndexType::Tree,
                 opts: vec![IndexOption::Unique(false)],
                 parts: vec![Part::from("replicaset_id")],
                 // This means the local schema is already up to date and main loop doesn't need to do anything
@@ -2154,7 +2157,7 @@ impl Tables {
                 // Primary index
                 id: 0,
                 name: "id".into(),
-                itype: IndexType::Tree,
+                ty: IndexType::Tree,
                 opts: vec![IndexOption::Unique(true)],
                 parts: vec![Part::from("id")],
                 // This means the local schema is already up to date and main loop doesn't need to do anything
@@ -2166,7 +2169,7 @@ impl Tables {
                 table_id: Self::TABLE_ID,
                 id: 1,
                 name: "name".into(),
-                itype: IndexType::Tree,
+                ty: IndexType::Tree,
                 opts: vec![IndexOption::Unique(true)],
                 parts: vec![Part::from("name")],
                 // This means the local schema is already up to date and main loop doesn't need to do anything
@@ -2273,7 +2276,7 @@ impl Indexes {
                 // Primary index
                 id: 0,
                 name: "id".into(),
-                itype: IndexType::Tree,
+                ty: IndexType::Tree,
                 opts: vec![IndexOption::Unique(true)],
                 parts: vec![Part::from("table_id"), Part::from("id")],
                 // This means the local schema is already up to date and main loop doesn't need to do anything
@@ -2285,7 +2288,7 @@ impl Indexes {
                 table_id: Self::TABLE_ID,
                 id: 1,
                 name: "name".into(),
-                itype: IndexType::Tree,
+                ty: IndexType::Tree,
                 opts: vec![IndexOption::Unique(true)],
                 parts: vec![Part::from("table_id"), Part::from("name")],
                 // This means the local schema is already up to date and main loop doesn't need to do anything
@@ -2474,25 +2477,35 @@ pub fn ddl_create_index_on_master(
     index_id: IndexId,
 ) -> traft::Result<()> {
     debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
-    let pico_index_def = storage
+    let mut pico_index_def = storage
         .indexes
         .get(space_id, index_id)?
         .ok_or_else(|| Error::other(format!("index with id {index_id} not found")))?;
-    let mut opts = IndexOptions::default();
-    opts.r#type = Some(pico_index_def.itype);
-    opts.id = Some(pico_index_def.id);
+    let mut opts = IndexOptions {
+        r#type: Some(pico_index_def.ty),
+        id: Some(pico_index_def.id),
+        ..Default::default()
+    };
+    // We use LUA to create indexes, so we need to switch from C 0-based to LUA 1-based
+    // part enumeration in options.
+    for part in pico_index_def.parts.iter_mut() {
+        let NumOrStr::Num(ref mut num) = part.field else {
+            unreachable!("index part field should be a number");
+        };
+        *num += 1;
+    }
     opts.parts = Some(pico_index_def.parts);
+    let dec_to_f32 = |d: Decimal| f32::from_str(&d.to_string()).expect("decimal to f32");
     for opt in pico_index_def.opts {
         match opt {
             IndexOption::Unique(unique) => opts.unique = Some(unique),
-            IndexOption::IfNotExists(if_not_exists) => opts.if_not_exists = Some(if_not_exists),
             IndexOption::Dimension(dim) => opts.dimension = Some(dim),
             IndexOption::Distance(distance) => opts.distance = Some(distance),
-            IndexOption::BloomFalsePositiveRate(rate) => opts.bloom_fpr = Some(rate as f32),
+            IndexOption::BloomFalsePositiveRate(rate) => opts.bloom_fpr = Some(dec_to_f32(rate)),
             IndexOption::PageSize(size) => opts.page_size = Some(size),
             IndexOption::RangeSize(size) => opts.range_size = Some(size),
             IndexOption::RunCountPerLevel(count) => opts.run_count_per_level = Some(count),
-            IndexOption::RunSizeRatio(ratio) => opts.run_size_ratio = Some(ratio as f32),
+            IndexOption::RunSizeRatio(ratio) => opts.run_size_ratio = Some(dec_to_f32(ratio)),
             IndexOption::Hint(_) => {
                 // FIXME: `hint` option is disabled in Tarantool module.
             }
@@ -2768,7 +2781,7 @@ impl Users {
                 // Primary index
                 id: 0,
                 name: "id".into(),
-                itype: IndexType::Tree,
+                ty: IndexType::Tree,
                 opts: vec![IndexOption::Unique(true)],
                 parts: vec![Part::from("id")],
                 // This means the local schema is already up to date and main loop doesn't need to do anything
@@ -2780,7 +2793,7 @@ impl Users {
                 table_id: Self::TABLE_ID,
                 id: 1,
                 name: "name".into(),
-                itype: IndexType::Tree,
+                ty: IndexType::Tree,
                 opts: vec![IndexOption::Unique(true)],
                 parts: vec![Part::from("name")],
                 // This means the local schema is already up to date and main loop doesn't need to do anything
@@ -2903,7 +2916,7 @@ impl Privileges {
                 // Primary index
                 id: 0,
                 name: "primary".into(),
-                itype: IndexType::Tree,
+                ty: IndexType::Tree,
                 opts: vec![IndexOption::Unique(true)],
                 parts: vec![
                     Part::from("grantee_id"),
@@ -2920,7 +2933,7 @@ impl Privileges {
                 table_id: Self::TABLE_ID,
                 id: 1,
                 name: "object".into(),
-                itype: IndexType::Tree,
+                ty: IndexType::Tree,
                 opts: vec![IndexOption::Unique(false)],
                 parts: vec![Part::from("object_type"), Part::from("object_id")],
                 // This means the local schema is already up to date and main loop doesn't need to do anything
@@ -3092,7 +3105,7 @@ impl Tiers {
             // Primary index
             id: 0,
             name: "name".into(),
-            itype: IndexType::Tree,
+            ty: IndexType::Tree,
             opts: vec![IndexOption::Unique(true)],
             parts: vec![Part::from("name")],
             // This means the local schema is already up to date and main loop doesn't need to do anything
@@ -3170,7 +3183,7 @@ impl Routines {
                 // Primary index
                 id: 0,
                 name: "id".into(),
-                itype: IndexType::Tree,
+                ty: IndexType::Tree,
                 opts: vec![IndexOption::Unique(true)],
                 parts: vec![Part::from("id")],
                 // This means the local schema is already up to date and main loop doesn't need to do anything
@@ -3182,7 +3195,7 @@ impl Routines {
                 table_id: Self::TABLE_ID,
                 id: 1,
                 name: "name".into(),
-                itype: IndexType::Tree,
+                ty: IndexType::Tree,
                 opts: vec![IndexOption::Unique(true)],
                 parts: vec![Part::from("name")],
                 // This means the local schema is already up to date and main loop doesn't need to do anything
@@ -4533,8 +4546,6 @@ mod tests {
 
     #[track_caller]
     fn get_field_index(part: &Part, space_fields: &[tarantool::space::Field]) -> usize {
-        use tarantool::util::NumOrStr;
-
         match &part.field {
             NumOrStr::Num(index) => {
                 return *index as _;
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 704c938151..3a6332781e 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -1614,7 +1614,7 @@ impl NodeImpl {
                     table_id: id,
                     id: 0,
                     name: "primary_key".into(),
-                    itype: IndexType::Tree,
+                    ty: IndexType::Tree,
                     opts: vec![IndexOption::Unique(true)],
                     parts: primary_key,
                     operable: false,
@@ -1651,7 +1651,7 @@ impl NodeImpl {
                             table_id: id,
                             id: 1,
                             name: "bucket_id".into(),
-                            itype: IndexType::Tree,
+                            ty: IndexType::Tree,
                             opts: vec![IndexOption::Unique(false)],
                             parts: vec![Part::field(bucket_id_index)
                                 .field_type(IFT::Unsigned)
@@ -1693,7 +1693,7 @@ impl NodeImpl {
                 space_id,
                 index_id,
                 name,
-                itype,
+                ty,
                 opts,
                 by_fields,
                 owner,
@@ -1702,7 +1702,7 @@ impl NodeImpl {
                     table_id: space_id,
                     id: index_id,
                     name,
-                    itype,
+                    ty,
                     opts,
                     parts: by_fields,
                     operable: false,
diff --git a/src/traft/op.rs b/src/traft/op.rs
index 919ec06260..840bfd4e46 100644
--- a/src/traft/op.rs
+++ b/src/traft/op.rs
@@ -670,7 +670,7 @@ pub enum Ddl {
         space_id: SpaceId,
         index_id: IndexId,
         name: String,
-        itype: IndexType,
+        ty: IndexType,
         opts: Vec<IndexOption>,
         by_fields: Vec<Part>,
         owner: UserId,
diff --git a/test/conftest.py b/test/conftest.py
index 95cd134a18..a9c35f706c 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -47,6 +47,7 @@ BASE_PORT = 3300
 PORT_RANGE = 200
 
 MAX_LOGIN_ATTEMPTS = 4
+PICO_SERVICE_ID = 32
 
 
 def eprint(*args, **kwargs):
diff --git a/test/int/test_audit.py b/test/int/test_audit.py
index c22b21797c..c2198a7976 100644
--- a/test/int/test_audit.py
+++ b/test/int/test_audit.py
@@ -323,6 +323,27 @@ def test_role(instance: Instance):
     assert drop_role["initiator"] == "bubba"
 
 
+def test_index(instance: Instance):
+    instance.start()
+    instance.sql(
+        """
+        create table "foo" ("val" int not null, primary key ("val"))
+        distributed by ("val")
+        """
+    )
+    instance.sql(""" create index "foo_idx" on "foo" ("val") """)
+    instance.terminate()
+
+    events = AuditFile(instance.audit_flag_value).events()
+
+    create_index = take_until_title(events, "create_index")
+    assert create_index is not None
+    assert create_index["name"] == "foo_idx"
+    assert create_index["message"] == f"created index `{create_index['name']}`"
+    assert create_index["severity"] == "medium"
+    assert create_index["initiator"] == "pico_service"
+
+
 def assert_instance_expelled(expelled_instance: Instance, instance: Instance):
     info = instance.call(".proc_instance_info", expelled_instance.instance_id)
     grades = (info["current_grade"]["variant"], info["target_grade"]["variant"])
diff --git a/test/int/test_basics.py b/test/int/test_basics.py
index 2639d32b25..d592dbabd1 100644
--- a/test/int/test_basics.py
+++ b/test/int/test_basics.py
@@ -426,7 +426,7 @@ Insert({_pico_user}, [31,"super",0,null,1,"role"]))|
 Insert({_pico_table}, [{_pico_table},"_pico_table",["global"],[["id","unsigned",false],["name","string",false],["distribution","array",false],["format","array",false],["schema_version","unsigned",false],["operable","boolean",false],["engine","string",false],["owner","unsigned",false]],0,true,"memtx",1]),
 Insert({_pico_index}, [{_pico_table},0,"id","tree",[{{"unique":true}}],[["id",null,null,null,null]],true,0,1]),
 Insert({_pico_index}, [{_pico_table},1,"name","tree",[{{"unique":true}}],[["name",null,null,null,null]],true,0,1]),
-Insert({_pico_table}, [{_pico_index},"_pico_index",["global"],[["table_id","unsigned",false],["id","unsigned",false],["name","string",false],["itype","string",false],["opts","array",false],["parts","array",false],["operable","boolean",false],["schema_version","unsigned",false],["owner","unsigned",false]],0,true,"memtx",1]),
+Insert({_pico_table}, [{_pico_index},"_pico_index",["global"],[["table_id","unsigned",false],["id","unsigned",false],["name","string",false],["type","string",false],["opts","array",false],["parts","array",false],["operable","boolean",false],["schema_version","unsigned",false],["owner","unsigned",false]],0,true,"memtx",1]),
 Insert({_pico_index}, [{_pico_index},0,"id","tree",[{{"unique":true}}],[["table_id",null,null,null,null],["id",null,null,null,null]],true,0,1]),
 Insert({_pico_index}, [{_pico_index},1,"name","tree",[{{"unique":true}}],[["table_id",null,null,null,null],["name",null,null,null,null]],true,0,1]),
 Insert({_pico_table}, [{_pico_peer_address},"_pico_peer_address",["global"],[["raft_id","unsigned",false],["address","string",false]],0,true,"memtx",1]),
diff --git a/test/int/test_ddl.py b/test/int/test_ddl.py
index eaa4b4214b..6fbaec57ae 100644
--- a/test/int/test_ddl.py
+++ b/test/int/test_ddl.py
@@ -1,9 +1,6 @@
 import pytest
 import time
-from conftest import Cluster, ReturnError
-
-
-PICO_SERVICE_ID = 32
+from conftest import PICO_SERVICE_ID, Cluster, ReturnError
 
 
 def test_ddl_abort(cluster: Cluster):
@@ -83,7 +80,7 @@ def test_ddl_lua_api(cluster: Cluster):
         )
     )
     space_id = 1027
-    initiator_id = 32  # pico_service
+    initiator_id = PICO_SERVICE_ID
     pico_space_def = [
         space_id,
         "space 2",
@@ -280,7 +277,7 @@ def test_ddl_create_table_bulky(cluster: Cluster):
     assert i4.next_schema_version() == 3
 
     # Space was created and is operable
-    initiator_id = 32  # pico_service
+    initiator_id = PICO_SERVICE_ID
     pico_space_def = [
         space_id,
         "stuff",
@@ -392,7 +389,7 @@ def test_ddl_create_sharded_space(cluster: Cluster):
 
     ############################################################################
     # Space was created and is operable
-    initiator_id = 32  # pico_service
+    initiator_id = PICO_SERVICE_ID
     pico_space_def = [
         space_id,
         "stuff",
@@ -739,7 +736,7 @@ def test_ddl_create_table_from_snapshot_at_boot(cluster: Cluster):
         ),
     )
 
-    initiator_id = 32  # pico_service,
+    initiator_id = PICO_SERVICE_ID
     tt_space_def = [
         space_id,
         initiator_id,
@@ -823,7 +820,7 @@ def test_ddl_create_table_from_snapshot_at_catchup(cluster: Cluster):
     i1.raft_wait_index(index)
     i2.raft_wait_index(index)
 
-    initiator_id = 32  # pico_service
+    initiator_id = PICO_SERVICE_ID
     tt_space_def = [
         space_id,
         initiator_id,
diff --git a/test/int/test_sql.py b/test/int/test_sql.py
index 3c49931afb..17fdbc010e 100644
--- a/test/int/test_sql.py
+++ b/test/int/test_sql.py
@@ -3123,3 +3123,128 @@ box error: AccessDenied: Alter access to user 'boba' is denied for user 'biba'.\
     # Check that we can create a new user "boba" after rename him
     i1.create_user(with_name=boba, with_password=password)
     assert boba in names_from_pico_user_table()
+
+
+def test_index(cluster: Cluster):
+    cluster.deploy(instance_count=1)
+    i1 = cluster.instances[0]
+
+    # Sharded memtx table
+    ddl = i1.sql(
+        """
+        create table t (a text not null, b text not null, c text, primary key (a))
+        using memtx
+        distributed by (a)
+        option (timeout = 3)
+        """
+    )
+    assert ddl["row_count"] == 1
+
+    # Global table
+    ddl = i1.sql(
+        """
+        create table g (a int not null, b text not null, c text, primary key (a))
+        distributed globally
+        option (timeout = 3)
+        """
+    )
+    assert ddl["row_count"] == 1
+
+    # Sharded vinyl table
+    ddl = i1.sql(
+        """
+        create table v (a int not null, b text not null, c text, primary key (a))
+        using vinyl
+        distributed by (a)
+        option (timeout = 3)
+        """
+    )
+
+    # Check that created index appears in _pico_index table.
+    ddl = i1.sql(""" create index i0 on t (a) """)
+    assert ddl["row_count"] == 1
+    data = i1.sql(""" select * from "_pico_index" where "name" = 'I0' """)
+    assert data["rows"] != []
+
+    # Successful tree index creation with default options
+    ddl = i1.sql(""" create index i1 on t (a, b) """)
+    assert ddl["row_count"] == 1
+
+    # Unique index can be created only on the sharding key for sharded tables.
+    invalid_unique = (
+        "unique index for the sharded table must duplicate its sharding key columns"
+    )
+    with pytest.raises(ReturnError, match=invalid_unique):
+        i1.sql(""" create unique index i2 on t using tree (b) """)
+
+    # Successful unique tree index creation on the sharding key.
+    ddl = i1.sql(""" create unique index i2 on t using tree (a) """)
+    assert ddl["row_count"] == 1
+
+    # No restrictions on unique index for globally distributed tables.
+    ddl = i1.sql(""" create unique index i3 on g using tree (b) """)
+
+    # Successful create a tree index with corresponding options.
+    ddl = i1.sql(""" create index i4 on t using tree (c) with (hint = true) """)
+    assert ddl["row_count"] == 1
+
+    # Fail to create a tree index with wrong options.
+    invalid_tree_option = "index type tree does not support option"
+    with pytest.raises(ReturnError, match=invalid_tree_option):
+        i1.sql(""" create index i5 on t using tree (c) with (distance = euclid) """)
+    with pytest.raises(ReturnError, match=invalid_tree_option):
+        i1.sql(""" create index i6 on t using tree (c) with (dimension = 42) """)
+
+    # RTree indexes can't be created via SQL at the moment as they require array columns
+    # that are not supported yet.
+    non_array_rtree = "index type rtree does not support column type"
+    with pytest.raises(ReturnError, match=non_array_rtree):
+        i1.sql(""" create index i11 on t using rtree (b) """)
+
+    # Fail to create an rtree index from nullable columns.
+    nullable_rtree = "index type rtree does not support nullable columns"
+    with pytest.raises(ReturnError, match=nullable_rtree):
+        i1.sql(""" create index i12 on t using rtree (c) """)
+
+    # Fail to create an rtree index with wrong options.
+    invalid_rtree_option = "index type rtree does not support option"
+    with pytest.raises(ReturnError, match=invalid_rtree_option):
+        i1.sql(""" create index i13 on t using rtree (b) with (hint = true) """)
+
+    # Successful bitset index creation.
+    ddl = i1.sql(""" create index i14 on t using bitset (b) """)
+
+    # Fail to create a bitset index from nullable columns.
+    nullable_bitset = "index type bitset does not support nullable columns"
+    with pytest.raises(ReturnError, match=nullable_bitset):
+        i1.sql(""" create index i15 on t using bitset (c) """)
+
+    # Fail to create unique bitset index.
+    unique_bitset = "index type bitset does not support unique indexes"
+    with pytest.raises(ReturnError, match=unique_bitset):
+        i1.sql(""" create unique index i16 on t using bitset (a) """)
+
+    # Fail to create bitset index with column types other then string, number or varbinary.
+    invalid_bitset = "index type bitset does not support column type"
+    with pytest.raises(ReturnError, match=invalid_bitset):
+        i1.sql(""" create index i17 on v using bitset (a) """)
+
+    # Successful hash index creation.
+    ddl = i1.sql(""" create unique index i17 on t using hash (a) """)
+    assert ddl["row_count"] == 1
+
+    # Fail to create a non-unique hash index.
+    non_unique_hash = "index type hash does not support non-unique indexes"
+    with pytest.raises(ReturnError, match=non_unique_hash):
+        i1.sql(""" create index i18 on t using hash (c) """)
+
+    # Fail to create an index on memtex table with vinyl options.
+    invalid_memtx = "table engine memtx does not support option"
+    with pytest.raises(ReturnError, match=invalid_memtx):
+        i1.sql(""" create index i7 on t (b) with (page_size = 42) """)
+    with pytest.raises(ReturnError, match=invalid_memtx):
+        i1.sql(""" create index i8 on t (b) with (range_size = 42) """)
+    with pytest.raises(ReturnError, match=invalid_memtx):
+        i1.sql(""" create index i9 on t (b) with (run_count_per_level = 42) """)
+    with pytest.raises(ReturnError, match=invalid_memtx):
+        i1.sql(""" create index i10 on t (b) with (run_size_ratio = 0.1) """)
-- 
GitLab