diff --git a/src/lib.rs b/src/lib.rs
index a5d37fca81134d134c4cc654b3b912940e050680..486027fb6f1b9749b9c7c825ce37edd9a784c70a 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -14,14 +14,15 @@ use std::time::{Duration, Instant};
 use storage::Clusterwide;
 use storage::ToEntryIter as _;
 use storage::{ClusterwideSpace, PropertyName};
-use traft::rpc;
 use traft::rpc::{join, update_instance};
 use traft::RaftSpaceAccess;
+use traft::{rpc, RaftTerm};
 
 use protobuf::Message as _;
 
 use crate::instance::grade::TargetGradeVariant;
 use crate::instance::InstanceId;
+use crate::schema::CreateSpaceParams;
 use crate::tlog::set_log_level;
 use crate::traft::event::Event;
 use crate::traft::op::{self, Op};
@@ -462,21 +463,45 @@ fn picolib_setup(args: &args::Run) {
         tlua::Function::new(
             |op: op::DmlInLua, predicate: rpc::cas::Predicate| -> traft::Result<RaftIndex> {
                 let op = op::Dml::from_lua_args(op).map_err(Error::other)?;
-                compare_and_swap(op.into(), predicate)
+                let (index, _) = compare_and_swap(op.into(), predicate)?;
+                Ok(index)
             },
         ),
     );
+
+    luamod.set("create_space", {
+        tlua::function2(
+            |params: CreateSpaceParams,
+             // Specifying the timeout identifies how long user is ready to wait for ddl to be applied.
+             // But it does not provide guarantees that a ddl will be aborted if wait for commit timeouts.
+             timeout_sec: f64|
+             -> traft::Result<RaftIndex> {
+                let timeout = Duration::from_secs_f64(timeout_sec);
+                let storage = &node::global()?.storage;
+                params.validate(storage)?;
+                // TODO: check space creation and rollback
+                // box.begin() box.schema.space.create() box.rollback()
+                let op = params.into_ddl(storage);
+                let index = schema::prepare_ddl(op, timeout)?;
+                let commit_index = schema::wait_for_ddl_commit(index, timeout)?;
+                Ok(commit_index)
+            },
+        )
+    });
 }
 
 /// Performs a clusterwide compare and swap operation.
 ///
 /// E.g. it checks the `predicate` on leader and if no conflicting entries were found
-/// appends the `op` to the raft log and returns its index.
+/// appends the `op` to the raft log and returns its index and term.
 ///
 /// # Errors
 /// See [`rpc::cas::Error`] for CaS-specific errors.
 /// It can also return general picodata errors in cases of faulty network or storage.
-pub fn compare_and_swap(op: Op, predicate: rpc::cas::Predicate) -> traft::Result<RaftIndex> {
+pub fn compare_and_swap(
+    op: Op,
+    predicate: rpc::cas::Predicate,
+) -> traft::Result<(RaftIndex, RaftTerm)> {
     let node = node::global()?;
     let request = rpc::cas::Request {
         cluster_id: node
@@ -504,7 +529,7 @@ pub fn compare_and_swap(op: Op, predicate: rpc::cas::Predicate) -> traft::Result
         let resp = rpc::network_call(&leader_address, &request);
         let resp = fiber::block_on(resp.timeout(Duration::from_secs(3)));
         match resp {
-            Ok(rpc::cas::Response { index, .. }) => return Ok(index),
+            Ok(rpc::cas::Response { index, term }) => return Ok((index, term)),
             Err(e) => {
                 tlog!(Warning, "{e}");
                 return Err(e.into());
diff --git a/src/schema.rs b/src/schema.rs
index 67740805b6a1f64c97b53703dd26cd6ccb774860..88998daebeae1966ea5937d2a6d69309f5757cbe 100644
--- a/src/schema.rs
+++ b/src/schema.rs
@@ -1,19 +1,27 @@
-use crate::storage::set_pico_schema_version;
-use crate::traft;
-use crate::traft::op::Ddl;
-use serde::{Deserialize, Serialize};
 use std::borrow::Cow;
-use std::collections::BTreeMap;
+use std::collections::{BTreeMap, HashSet};
+use std::time::{Duration, Instant};
+
+use tarantool::space::FieldType;
 use tarantool::{
     index::Metadata as IndexMetadata,
     index::{IndexId, Part},
     schema::space::SpaceMetadata,
-    space::{Field, SpaceId},
+    space::SpaceId,
     space::{Space, SystemSpace},
+    tlua::{self, LuaRead},
     tuple::Encode,
     util::Value,
 };
 
+use serde::{Deserialize, Serialize};
+use thiserror::Error;
+
+use crate::compare_and_swap;
+use crate::storage::{set_pico_schema_version, Clusterwide, ClusterwideSpace, PropertyName};
+use crate::traft::op::{Ddl, DdlBuilder, Op};
+use crate::traft::{self, event, node, rpc, RaftIndex};
+
 /// Space definition.
 ///
 /// Describes a user-defined space.
@@ -22,10 +30,11 @@ pub struct SpaceDef {
     pub id: SpaceId,
     pub name: String,
     pub distribution: Distribution,
-    pub format: Vec<Field>,
+    pub format: Vec<tarantool::space::Field>,
     pub schema_version: u64,
     pub operable: bool,
 }
+
 impl Encode for SpaceDef {}
 
 impl SpaceDef {
@@ -66,7 +75,7 @@ impl SpaceDef {
 }
 
 /// Defines how to distribute tuples in a space across replicasets.
-#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, LuaRead)]
 #[serde(rename_all = "snake_case")]
 #[serde(tag = "kind")]
 pub enum Distribution {
@@ -119,6 +128,7 @@ pub struct IndexDef {
     pub operable: bool,
     pub unique: bool,
 }
+
 impl Encode for IndexDef {}
 
 impl IndexDef {
@@ -188,3 +198,382 @@ pub fn try_space_field_type_to_index_field_type(
     };
     res
 }
+
+#[derive(Debug, Error)]
+pub enum DdlError {
+    #[error("space creation failed: {0}")]
+    CreateSpace(#[from] CreateSpaceError),
+    #[error("ddl operation was aborted")]
+    Aborted,
+}
+
+#[derive(Debug, Error)]
+pub enum CreateSpaceError {
+    #[error("space with id {0} already exists")]
+    IdExists(SpaceId),
+    #[error("space with name {0} already exists")]
+    NameExists(String),
+    #[error("several fields have the same name: {0}")]
+    DuplicateFieldName(String),
+    #[error("no field with name: {0}")]
+    FieldUndefined(String),
+}
+
+// TODO: Add `LuaRead` to tarantool::space::Field and use it
+#[derive(Clone, Debug, LuaRead)]
+pub struct Field {
+    pub name: String, // TODO(gmoshkin): &str
+    pub r#type: FieldType,
+    pub is_nullable: bool,
+}
+
+impl From<Field> for tarantool::space::Field {
+    fn from(field: Field) -> Self {
+        tarantool::space::Field {
+            name: field.name,
+            field_type: field.r#type,
+            is_nullable: field.is_nullable,
+        }
+    }
+}
+
+#[derive(Clone, Debug, LuaRead)]
+pub struct CreateSpaceParams {
+    id: Option<SpaceId>,
+    name: String,
+    format: Vec<Field>,
+    primary_key: Vec<String>,
+    distribution: Distribution,
+}
+
+impl CreateSpaceParams {
+    pub fn validate(&self, storage: &Clusterwide) -> traft::Result<()> {
+        // Check that there is no space with this name
+        if storage.spaces.by_name(&self.name)?.is_some() {
+            return Err(
+                DdlError::CreateSpace(CreateSpaceError::NameExists(self.name.clone())).into(),
+            );
+        }
+        // Check that there is no space with this id (if specified)
+        if let Some(id) = self.id {
+            if storage.spaces.get(id)?.is_some() {
+                return Err(DdlError::CreateSpace(CreateSpaceError::IdExists(id)).into());
+            }
+        }
+        // All field names are unique
+        let mut field_names = HashSet::new();
+        for field in &self.format {
+            if !field_names.insert(field.name.as_str()) {
+                return Err(DdlError::CreateSpace(CreateSpaceError::DuplicateFieldName(
+                    field.name.clone(),
+                ))
+                .into());
+            }
+        }
+        // All primary key components exist in fields
+        for part in &self.primary_key {
+            if !field_names.contains(part.as_str()) {
+                return Err(
+                    DdlError::CreateSpace(CreateSpaceError::FieldUndefined(part.clone())).into(),
+                );
+            }
+        }
+        // All sharding key components exist in fields
+        match &self.distribution {
+            Distribution::Global => (),
+            Distribution::ShardedImplicitly { sharding_key, .. } => {
+                let mut parts = HashSet::new();
+                for part in sharding_key {
+                    if !field_names.contains(part.as_str()) {
+                        return Err(DdlError::CreateSpace(CreateSpaceError::FieldUndefined(
+                            part.clone(),
+                        ))
+                        .into());
+                    }
+                    // And all parts are unique
+                    if !parts.insert(part.as_str()) {
+                        return Err(DdlError::CreateSpace(CreateSpaceError::DuplicateFieldName(
+                            part.clone(),
+                        ))
+                        .into());
+                    }
+                }
+            }
+            Distribution::ShardedByField { field } => {
+                if !field_names.contains(field.as_str()) {
+                    return Err(DdlError::CreateSpace(CreateSpaceError::FieldUndefined(
+                        field.clone(),
+                    ))
+                    .into());
+                }
+            }
+        }
+        Ok(())
+    }
+
+    pub fn into_ddl(self, _storage: &Clusterwide) -> Ddl {
+        let id = if let Some(id) = self.id {
+            id
+        } else {
+            todo!("max_id + 1");
+        };
+        let primary_key: Vec<_> = self.primary_key.into_iter().map(Part::field).collect();
+        let format: Vec<_> = self
+            .format
+            .into_iter()
+            .map(tarantool::space::Field::from)
+            .collect();
+        Ddl::CreateSpace {
+            id,
+            name: self.name,
+            format,
+            primary_key,
+            distribution: self.distribution,
+        }
+    }
+}
+
+/// Waits for a pending ddl to be either `Committed` or `Aborted` by the governor.
+///
+/// Returns an index of the corresponding `DdlCommit` entry.
+///
+/// If `timeout` is reached earlier returns an error.
+pub fn wait_for_ddl_commit(
+    prepare_commit: RaftIndex,
+    timeout: Duration,
+) -> traft::Result<RaftIndex> {
+    let raft_storage = &node::global()?.raft_storage;
+    let deadline = Instant::now() + timeout;
+    let last_seen = prepare_commit;
+    loop {
+        let cur_applied = raft_storage.applied()?.expect("commit is always persisted");
+        let new_entries = raft_storage.entries(last_seen + 1, cur_applied + 1)?;
+        for entry in new_entries {
+            if entry.entry_type != raft::prelude::EntryType::EntryNormal {
+                continue;
+            }
+            let index = entry.index;
+            let op = entry.into_op().unwrap_or(Op::Nop);
+            match op {
+                Op::DdlCommit => return Ok(index),
+                Op::DdlAbort => return Err(DdlError::Aborted.into()),
+                _ => (),
+            }
+        }
+
+        if let Some(timeout) = deadline.checked_duration_since(Instant::now()) {
+            event::wait_timeout(event::Event::EntryApplied, timeout)?;
+        } else {
+            return Err(traft::error::Error::Timeout);
+        }
+    }
+}
+
+/// Waits until there is no pending schema change.
+///
+/// If `timeout` is reached earlier returns an error.
+fn wait_for_no_pending_schema_change(
+    storage: &Clusterwide,
+    timeout: Duration,
+) -> traft::Result<()> {
+    let deadline = Instant::now() + timeout;
+    loop {
+        if storage.properties.pending_schema_change()?.is_none() {
+            return Ok(());
+        }
+
+        if let Some(timeout) = deadline.checked_duration_since(Instant::now()) {
+            event::wait_timeout(event::Event::EntryApplied, timeout)?;
+        } else {
+            return Err(traft::error::Error::Timeout);
+        }
+    }
+}
+
+/// Prepares a ddl for execution on a cluster-wide schema
+/// by proposing DdlPrepare Raft entry.
+/// After that ddl will be either `Committed` or `Aborted` by the governor.
+///
+/// If `timeout` is reached earlier returns an error.
+pub fn prepare_ddl(op: Ddl, timeout: Duration) -> traft::Result<RaftIndex> {
+    loop {
+        let storage = &node::global()?.storage;
+        let raft_storage = &node::global()?.raft_storage;
+        let op = DdlBuilder::new(storage)?.with_op(op.clone());
+        wait_for_no_pending_schema_change(storage, timeout)?;
+        let index = node::global()?.wait_for_read_state(timeout)?;
+        let term = raft::Storage::term(raft_storage, index)?;
+        let predicate = rpc::cas::Predicate {
+            index,
+            term,
+            ranges: vec![
+                rpc::cas::Range::new(ClusterwideSpace::Property)
+                    .eq((PropertyName::PendingSchemaChange,)),
+                rpc::cas::Range::new(ClusterwideSpace::Property)
+                    .eq((PropertyName::CurrentSchemaVersion,)),
+                rpc::cas::Range::new(ClusterwideSpace::Property)
+                    .eq((PropertyName::NextSchemaVersion,)),
+            ],
+        };
+        let (index, term) = compare_and_swap(op, predicate)?;
+        rpc::sync::wait_for_index_timeout(index, raft_storage, timeout)?;
+        if raft::Storage::term(raft_storage, index)? != term {
+            // leader switched - retry
+            continue;
+        }
+        return Ok(index);
+    }
+}
+
+mod tests {
+    use tarantool::space::FieldType;
+
+    use super::*;
+
+    #[::tarantool::test]
+    fn ddl() {
+        let storage = Clusterwide::new().unwrap();
+        let existing_space = "existing_space";
+        let existing_id = 0;
+        storage
+            .spaces
+            .insert(&SpaceDef {
+                id: existing_id,
+                name: existing_space.into(),
+                distribution: Distribution::Global,
+                format: vec![],
+                schema_version: 0,
+                operable: true,
+            })
+            .unwrap();
+
+        let new_space = "new_space";
+        let new_id = 1;
+        let field1 = Field {
+            name: "field1".into(),
+            r#type: FieldType::Any,
+            is_nullable: false,
+        };
+        let field2 = Field {
+            name: "field2".into(),
+            r#type: FieldType::Any,
+            is_nullable: false,
+        };
+
+        let err = CreateSpaceParams {
+            id: Some(existing_id),
+            name: new_space.into(),
+            format: vec![],
+            primary_key: vec![],
+            distribution: Distribution::Global,
+        }
+        .validate(&storage)
+        .unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "ddl failed: space creation failed: space with id 0 already exists"
+        );
+
+        let err = CreateSpaceParams {
+            id: Some(new_id),
+            name: existing_space.into(),
+            format: vec![],
+            primary_key: vec![],
+            distribution: Distribution::Global,
+        }
+        .validate(&storage)
+        .unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "ddl failed: space creation failed: space with name existing_space already exists"
+        );
+
+        let err = CreateSpaceParams {
+            id: Some(new_id),
+            name: new_space.into(),
+            format: vec![field1.clone(), field1.clone()],
+            primary_key: vec![],
+            distribution: Distribution::Global,
+        }
+        .validate(&storage)
+        .unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "ddl failed: space creation failed: several fields have the same name: field1"
+        );
+
+        let err = CreateSpaceParams {
+            id: Some(new_id),
+            name: new_space.into(),
+            format: vec![field1.clone()],
+            primary_key: vec![field2.name.clone()],
+            distribution: Distribution::Global,
+        }
+        .validate(&storage)
+        .unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "ddl failed: space creation failed: no field with name: field2"
+        );
+
+        let err = CreateSpaceParams {
+            id: Some(new_id),
+            name: new_space.into(),
+            format: vec![field1.clone()],
+            primary_key: vec![],
+            distribution: Distribution::ShardedImplicitly {
+                sharding_key: vec![field2.name.clone()],
+                sharding_fn: ShardingFn::Md5,
+            },
+        }
+        .validate(&storage)
+        .unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "ddl failed: space creation failed: no field with name: field2"
+        );
+
+        let err = CreateSpaceParams {
+            id: Some(new_id),
+            name: new_space.into(),
+            format: vec![field1.clone()],
+            primary_key: vec![],
+            distribution: Distribution::ShardedImplicitly {
+                sharding_key: vec![field1.name.clone(), field1.name.clone()],
+                sharding_fn: ShardingFn::Md5,
+            },
+        }
+        .validate(&storage)
+        .unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "ddl failed: space creation failed: several fields have the same name: field1"
+        );
+
+        let err = CreateSpaceParams {
+            id: Some(new_id),
+            name: new_space.into(),
+            format: vec![field1.clone()],
+            primary_key: vec![],
+            distribution: Distribution::ShardedByField {
+                field: field2.name.clone(),
+            },
+        }
+        .validate(&storage)
+        .unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "ddl failed: space creation failed: no field with name: field2"
+        );
+
+        CreateSpaceParams {
+            id: Some(new_id),
+            name: new_space.into(),
+            format: vec![field1, field2.clone()],
+            primary_key: vec![field2.name.clone()],
+            distribution: Distribution::ShardedByField { field: field2.name },
+        }
+        .validate(&storage)
+        .unwrap();
+    }
+}
diff --git a/src/storage.rs b/src/storage.rs
index 4499f0b8991bf2f50d0f57a3665c37fa68b23e11..4b5b0f28f6b452e92dfa0e4579a1f88079e72703 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -755,6 +755,17 @@ impl Properties {
             .unwrap_or_default();
         Ok(res)
     }
+
+    #[inline]
+    pub fn next_schema_version(&self) -> tarantool::Result<u64> {
+        let res = if let Some(version) = self.get(PropertyName::NextSchemaVersion)? {
+            version
+        } else {
+            let current = self.current_schema_version()?;
+            current + 1
+        };
+        Ok(res)
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1373,7 +1384,7 @@ impl Spaces {
     }
 
     #[inline]
-    pub fn by_name(&self, name: String) -> tarantool::Result<Option<SpaceDef>> {
+    pub fn by_name(&self, name: &str) -> tarantool::Result<Option<SpaceDef>> {
         match self.index_name.get(&[name])? {
             Some(tuple) => tuple.decode().map(Some),
             None => Ok(None),
diff --git a/src/traft/error.rs b/src/traft/error.rs
index 8e4aad3479f6646974220797b75d74d23ce8f5fc..0ca498c01c004375b433afe92ca365a09537d676 100644
--- a/src/traft/error.rs
+++ b/src/traft/error.rs
@@ -63,6 +63,8 @@ pub enum Error {
 
     #[error("compare-and-swap request failed: {0}")]
     Cas(#[from] crate::traft::rpc::cas::Error),
+    #[error("ddl failed: {0}")]
+    Ddl(#[from] crate::schema::DdlError),
 
     #[error("sbroad: {0}")]
     Sbroad(#[from] sbroad::errors::SbroadError),
diff --git a/src/traft/mod.rs b/src/traft/mod.rs
index 7a65cfea82e31e6b72b5cd175952fc6365ad1fea..81396ea379fbb4f3fd7d9de2910454a2a32d1fff 100644
--- a/src/traft/mod.rs
+++ b/src/traft/mod.rs
@@ -184,7 +184,7 @@ impl Entry {
 
     /// Returns the contained `Op` if it's an `EntryNormal`
     /// consuming `self` by value.
-    fn into_op(self) -> Option<Op> {
+    pub fn into_op(self) -> Option<Op> {
         match self.context {
             Some(EntryContext::Normal(v)) => Some(v.op),
             Some(EntryContext::ConfChange(_)) => None,
diff --git a/src/traft/op.rs b/src/traft/op.rs
index 8b5f09b36d2ed8255069b5cd6a714ecba742c879..699ca904687d79fdbed316eb3740c963925d99f0 100644
--- a/src/traft/op.rs
+++ b/src/traft/op.rs
@@ -1,6 +1,6 @@
 use crate::instance::Instance;
 use crate::schema::Distribution;
-use crate::storage::{ClusterwideSpace, ClusterwideSpaceIndex};
+use crate::storage::{Clusterwide, ClusterwideSpace, ClusterwideSpaceIndex};
 use ::tarantool::index::{IndexId, Part};
 use ::tarantool::space::{Field, SpaceId};
 use ::tarantool::tlua;
@@ -402,73 +402,33 @@ pub enum Ddl {
 ///
 /// # Example
 /// ```no_run
-/// use picodata::traft::op::DdlBuilder;
+/// use picodata::traft::op::{DdlBuilder, Ddl};
 ///
 /// // Assuming that space `1` was created.
 /// let op = DdlBuilder::with_schema_version(1)
-///     .drop_space(1);
+///     .with_op(Ddl::DropSpace { id: 1 });
 /// ```
 pub struct DdlBuilder {
     schema_version: u64,
 }
 
 impl DdlBuilder {
+    pub fn new(storage: &Clusterwide) -> super::Result<Self> {
+        let version = storage.properties.next_schema_version()?;
+        Ok(Self::with_schema_version(version))
+    }
+
     /// Sets current schema version.
     pub fn with_schema_version(version: u64) -> Self {
         Self {
             schema_version: version,
         }
     }
-    /// Creates an `Op::DdlPrepare(Ddl::CreateSpace)`.
-    #[inline(always)]
-    pub fn create_space(
-        &self,
-        id: SpaceId,
-        name: String,
-        format: Vec<tarantool::space::Field>,
-        primary_key: Vec<Part>,
-        distribution: Distribution,
-    ) -> Op {
-        Op::DdlPrepare {
-            ddl: Ddl::CreateSpace {
-                id,
-                name,
-                format,
-                primary_key,
-                distribution,
-            },
-            schema_version: self.schema_version,
-        }
-    }
-
-    /// Creates an `Op::DdlPrepare(Ddl::DropSpace)`.
-    #[inline(always)]
-    pub fn drop_space(&self, id: SpaceId) -> Op {
-        Op::DdlPrepare {
-            ddl: Ddl::DropSpace { id },
-            schema_version: self.schema_version,
-        }
-    }
-
-    /// Creates an `Op::DdlPrepare(Ddl::CreateIndex)`.
-    #[inline(always)]
-    pub fn create_index(&self, space_id: SpaceId, index_id: IndexId, by_fields: Vec<Part>) -> Op {
-        Op::DdlPrepare {
-            ddl: Ddl::CreateIndex {
-                space_id,
-                index_id,
-                by_fields,
-            },
-            schema_version: self.schema_version,
-        }
-    }
 
-    /// Creates an `Op::DdlPrepare(Ddl::DropIndex)`.
-    #[inline(always)]
-    pub fn drop_index(&self, space_id: SpaceId, index_id: IndexId) -> Op {
+    pub fn with_op(&self, op: Ddl) -> Op {
         Op::DdlPrepare {
-            ddl: Ddl::DropIndex { space_id, index_id },
             schema_version: self.schema_version,
+            ddl: op,
         }
     }
 }
diff --git a/src/traft/rpc/cas.rs b/src/traft/rpc/cas.rs
index 02e432e1a32e78d4102c3f4df693abd20a5dbe9e..1a9407b104d4383a9ae6b455556a0d102275af34 100644
--- a/src/traft/rpc/cas.rs
+++ b/src/traft/rpc/cas.rs
@@ -259,6 +259,7 @@ impl Predicate {
                 PendingSchemaChange.into(),
                 PendingSchemaVersion.into(),
                 CurrentSchemaVersion.into(),
+                NextSchemaVersion.into(),
             ]
             .into_iter()
             .map(|key: &str| Tuple::new(&(key,)))
@@ -455,16 +456,20 @@ mod tests {
         let space_id = 1;
         let index_id = 1;
 
-        let create_space = builder.create_space(
+        let create_space = builder.with_op(Ddl::CreateSpace {
+            id: space_id,
+            name: space_name.into(),
+            format: vec![],
+            primary_key: vec![],
+            distribution: Distribution::Global,
+        });
+        let drop_space = builder.with_op(Ddl::DropSpace { id: space_id });
+        let create_index = builder.with_op(Ddl::CreateIndex {
             space_id,
-            space_name.into(),
-            vec![],
-            vec![],
-            Distribution::Global,
-        );
-        let drop_space = builder.drop_space(space_id);
-        let create_index = builder.create_index(space_id, index_id, vec![]);
-        let drop_index = builder.drop_index(space_id, index_id);
+            index_id,
+            by_fields: vec![],
+        });
+        let drop_index = builder.with_op(Ddl::DropIndex { space_id, index_id });
 
         let commit = Op::DdlCommit;
         let abort = Op::DdlAbort;
@@ -472,6 +477,7 @@ mod tests {
         let pending_schema_change = (PropertyName::PendingSchemaChange.to_string(),);
         let pending_schema_version = (PropertyName::PendingSchemaChange.to_string(),);
         let current_schema_version = (PropertyName::CurrentSchemaVersion.to_string(),);
+        let next_schema_version = (PropertyName::NextSchemaVersion.to_string(),);
 
         // create_space
         assert!(t(&create_space, Range::new(props).eq(&pending_schema_change)).is_err());
@@ -538,6 +544,7 @@ mod tests {
         assert!(t(&abort, Range::new(props).eq(&pending_schema_change)).is_err());
         assert!(t(&abort, Range::new(props).eq(&pending_schema_version)).is_err());
         assert!(t(&abort, Range::new(props).eq(&current_schema_version)).is_err());
+        assert!(t(&abort, Range::new(props).eq(&next_schema_version)).is_err());
         assert!(t(&abort, Range::new(props).eq(("another_key",))).is_ok());
 
         assert!(t(&abort, Range::new("another_space").eq(("any_key",))).is_ok());
diff --git a/src/traft/rpc/sync.rs b/src/traft/rpc/sync.rs
index 2e190576eca7307082b04e95af09dea76af44a19..ee53d5c5c338e2ab26878b5c841683294785319a 100644
--- a/src/traft/rpc/sync.rs
+++ b/src/traft/rpc/sync.rs
@@ -21,6 +21,7 @@ crate::define_rpc_request! {
     }
 }
 
+// TODO: move out to traft as it's used not only in this rpc
 #[inline]
 pub fn wait_for_index_timeout(
     applied: RaftIndex,
diff --git a/test/conftest.py b/test/conftest.py
index 8133ac3b848d142160922894e45f23ebfacd33bb..8256d1274c172f9df44d9013da5b7edd5d82f7c7 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -533,7 +533,18 @@ class Instance:
 
         return t[1]
 
-    def ddl_create_space(
+    def create_space(self, params: dict, timeout: float = 3.0) -> int:
+        """
+        Creates a space. Returns a raft index at which a newly created space
+        has to exist on all peers.
+
+        Works through Lua API in difference to `propose_create_space`,
+        which is more low level and directly proposes a raft entry.
+        """
+        index = self.call("pico.create_space", params, timeout)
+        return index
+
+    def propose_create_space(
         self, space_def: Dict[str, Any], wait_index: bool = True, timeout: int = 3
     ) -> int:
         """
@@ -847,6 +858,14 @@ class Cluster:
             stderr=subprocess.PIPE,
         )
 
+    def create_space(self, params: dict, timeout: float = 3.0):
+        """
+        Creates a space. Waits for all peers to be aware of it.
+        """
+        index = self.instances[0].create_space(params, timeout)
+        for instance in self.instances:
+            instance.raft_wait_index(index)
+
     def cas(
         self,
         dml_kind: Literal["insert", "replace", "delete"],
diff --git a/test/int/test_ddl.py b/test/int/test_ddl.py
index 8c4b6fe9322efd7196468022bd6bd13bfd2fcdb4..412b7ea7d718b8aa59b232bb728f5dcdc465471e 100644
--- a/test/int/test_ddl.py
+++ b/test/int/test_ddl.py
@@ -1,5 +1,38 @@
 import pytest
-from conftest import Cluster
+from conftest import Cluster, ReturnError
+
+
+def test_ddl_create_space_lua(cluster: Cluster):
+    i1, i2 = cluster.deploy(instance_count=2)
+
+    # Successful space creation
+    cluster.create_space(
+        dict(
+            id=1,
+            name="some_name",
+            format=[dict(name="id", type="unsigned", is_nullable=False)],
+            primary_key=["id"],
+            distribution="global",
+        )
+    )
+    pico_space_def = [1, "some_name", ["global"], [["id", "unsigned", False]], 1, True]
+    assert i1.call("box.space._pico_space:get", 1) == pico_space_def
+    assert i2.call("box.space._pico_space:get", 1) == pico_space_def
+
+    # Space creation error
+    with pytest.raises(ReturnError) as e1:
+        cluster.create_space(
+            dict(
+                id=2,
+                name="different_name",
+                format=[dict(name="id", type="unsigned", is_nullable=False)],
+                primary_key=["not_defined"],
+                distribution="global",
+            )
+        )
+    assert e1.value.args == (
+        "ddl failed: space creation failed: no field with name: not_defined",
+    )
 
 
 def test_ddl_create_space_bulky(cluster: Cluster):
@@ -21,7 +54,7 @@ def test_ddl_create_space_bulky(cluster: Cluster):
     # Propose a space creation which will fail
 
     space_id = 713
-    abort_index = i1.ddl_create_space(
+    abort_index = i1.propose_create_space(
         dict(
             id=space_id,
             name="stuff",
@@ -31,6 +64,7 @@ def test_ddl_create_space_bulky(cluster: Cluster):
         ),
     )
 
+    # TODO: use `raft_wait_index`
     i1.call(".proc_sync_raft", abort_index, (3, 0))
     i2.call(".proc_sync_raft", abort_index, (3, 0))
     i3.call(".proc_sync_raft", abort_index, (3, 0))
@@ -61,7 +95,7 @@ def test_ddl_create_space_bulky(cluster: Cluster):
     ############################################################################
     # Propose a space creation which will succeed
 
-    commit_index = i1.ddl_create_space(
+    commit_index = i1.propose_create_space(
         dict(
             id=space_id,
             name="stuff",
@@ -176,7 +210,7 @@ def test_ddl_create_sharded_space(cluster: Cluster):
     # Propose a space creation which will succeed
     schema_version = i1.next_schema_version()
     space_id = 679
-    index = i1.ddl_create_space(
+    index = i1.propose_create_space(
         dict(
             id=space_id,
             name="stuff",
@@ -298,7 +332,7 @@ def test_ddl_create_space_partial_failure(cluster: Cluster):
         primary_key=[dict(field="id")],
         distribution=dict(kind="global"),
     )
-    index = i1.ddl_create_space(space_def)
+    index = i1.propose_create_space(space_def)
 
     i2.call(".proc_sync_raft", index, (3, 0))
     i3.call(".proc_sync_raft", index, (3, 0))
@@ -316,7 +350,7 @@ def test_ddl_create_space_partial_failure(cluster: Cluster):
 
     # Propose the same space creation which this time succeeds, because there's
     # no conflict on any online instances.
-    index = i1.ddl_create_space(space_def)
+    index = i1.propose_create_space(space_def)
     i2.call(".proc_sync_raft", index, (3, 0))
 
     assert i1.call("box.space._space:get", space_id) is not None
@@ -350,7 +384,7 @@ def test_successful_wakeup_after_ddl(cluster: Cluster):
         primary_key=[dict(field="id")],
         distribution=dict(kind="global"),
     )
-    index = i1.ddl_create_space(space_def)
+    index = i1.propose_create_space(space_def)
 
     i2.call(".proc_sync_raft", index, (3, 0))
     i3.call(".proc_sync_raft", index, (3, 0))
@@ -382,7 +416,7 @@ def test_ddl_from_snapshot(cluster: Cluster):
     # TODO: check other ddl operations
     # Propose a space creation which will succeed
     space_id = 632
-    index = i1.ddl_create_space(
+    index = i1.propose_create_space(
         dict(
             id=space_id,
             name="stuff",
diff --git a/test/int/test_sql.py b/test/int/test_sql.py
index b56224e59b867c543a1d63c1940c8f2b9b398705..42e18b7e1b306cde21bb4f2d72c1e2d4b85ca0cd 100644
--- a/test/int/test_sql.py
+++ b/test/int/test_sql.py
@@ -65,7 +65,7 @@ def test_select(cluster: Cluster):
     i1, i2 = cluster.instances
 
     space_id = 739
-    index = i1.ddl_create_space(
+    index = i1.propose_create_space(
         dict(
             id=space_id,
             name="T",