diff --git a/src/luamod.lua b/src/luamod.lua
index ef45685d6d6d340a6206f0a9c632eb73dfacbdb7..c817a8be47b54850f540808b8294dedd99ef285c 100644
--- a/src/luamod.lua
+++ b/src/luamod.lua
@@ -47,6 +47,12 @@ function pico.help(topic)
     end
 end
 
+local function mandatory_param(value, name)
+    if value == nil then
+        box.error(box.error.ILLEGAL_PARAMS, name .. ' is mandatory')
+    end
+end
+
 -- Get next id unoccupied by a user or a role. Tarantool stores both users and
 -- roles in the same space, so they share the same set of ids.
 local function get_next_grantee_id()
@@ -908,6 +914,158 @@ function pico.revoke_privilege(grantee, privilege, object_type, object_name, opt
     return reenterable_schema_change_request(deadline, make_op_if_needed)
 end
 
+help.create_space = [[
+pico.create_space(opts)
+=======================
+
+Creates a space.
+
+Returns when the space is created globally and becomes operable on the
+current instance. Returns the index of the corresponding Op::DdlCommit
+raft entry. It's necessary for syncing with other instances.
+Skips the request if the space already exists.
+
+NOTE: If this function returns a timeout error, the space may have been locally
+created and in the future the change can either be committed or rolled back.
+
+Params:
+
+    1. opts (table)
+        - name (string)
+        - format (table {table SpaceField,...}), see pico.help('table SpaceField')
+        - primary_key (table {string,...}), with field names
+        - id (optional number), default: implicitly generated
+        - distribution (string), one of 'global' | 'sharded'
+            in case it's sharded, either `by_field` (for explicit sharding)
+            or `sharding_key`+`sharding_fn` (for implicit sharding) options
+            must be supplied.
+        - by_field (optional string), usually 'bucket_id'
+        - sharding_key (optional table {string,...}) with field names
+        - sharding_fn (optional string), only default 'murmur3' is supported for now
+        - timeout (number), in seconds
+
+Returns:
+
+    (number)
+    or
+    (nil, string) in case of an error
+
+Example:
+
+    -- Creates a global space 'friends_of_peppa' with two fields:
+    -- id (unsigned) and name (string).
+    pico.create_space({
+        name = 'friends_of_peppa',
+        format = {
+            {name = 'id', type = 'unsigned', is_nullable = false},
+            {name = 'name', type = 'string', is_nullable = false},
+        },
+        primary_key = {'id'},
+        distribution = 'global',
+        timeout = 3,
+    })
+
+    -- Global spaces are updated with compare-and-swap, see pico.help('cas')
+    pico.cas({
+        kind = 'insert',
+        space = 'friends_of_peppa',
+        tuple = {1, 'Suzy'},
+    })
+
+    -- Global spaces are read with Tarantool `box` API for now.
+    box.space.friends_of_peppa:fselect()
+
+    -- Creates an implicitly sharded space 'wonderland' with two fields:
+    -- property (string) and value (any).
+    pico.create_space({
+        name = 'wonderland',
+        format = {
+            {name = 'property', type = 'string', is_nullable = false},
+            {name = 'value', type = 'integer', is_nullable = true}
+        },
+        primary_key = {'property'},
+        distribution = 'sharded',
+        sharding_key = {'property'},
+        timeout = 3,
+    })
+
+    -- Calculate an SQL-compatible hash for the bucket id.
+    local key = require('key_def').new({{fieldno = 1, type = 'string'}})
+    local tuple = box.tuple.new({'unicorns'})
+    local bucket_id = key:hash(tuple) % vshard.router.bucket_count()
+
+    -- Sharded spaces are updated via vshard api, see [1]
+    vshard.router.callrw(bucket_id, 'box.space.wonderland:insert', {{'unicorns', 12}})
+
+See also:
+
+    [1]: https://www.tarantool.io/en/doc/latest/reference/reference_rock/vshard/vshard_router/
+]]
+function pico.create_space(opts)
+    local ok, err = pcall(function()
+        box.internal.check_param_table(opts, {
+            name = 'string',
+            format = 'table',
+            primary_key = 'table',
+            id = 'number',
+            distribution = 'string',
+            by_field = 'string',
+            sharding_key = 'table',
+            sharding_fn = 'string',
+            timeout = 'number',
+        })
+        mandatory_param(opts, 'opts')
+        mandatory_param(opts.name, 'opts.name')
+        mandatory_param(opts.format, 'opts.format')
+        mandatory_param(opts.primary_key, 'opts.primary_key')
+        mandatory_param(opts.distribution, 'opts.distribution')
+        mandatory_param(opts.timeout, 'opts.timeout')
+
+        local ok, err = pico._check_create_space_opts(opts)
+        if not ok then
+            box.error(box.error.ILLEGAL_PARAMS, err)
+        end
+    end)
+    if not ok then
+        return nil, err
+    end
+
+    local deadline = fiber.clock() + opts.timeout
+
+    local should_wait_for_ddl_fin = true
+
+    -- XXX: we construct this closure every time the function is called,
+    -- which is bad for performance/jit. Refactor if problems are discovered.
+    local function make_op_if_needed()
+        local op, err = pico._make_create_space_op_if_needed(opts)
+        if op ~= nil then
+            return op
+        elseif err ~= nil then
+            error(err)
+        else
+            -- No op needed
+            should_wait_for_ddl_fin = false
+            return
+        end
+    end
+
+    local index, err = reenterable_schema_change_request(deadline, make_op_if_needed)
+    if index == nil then
+        return nil, err
+    end
+
+    if not should_wait_for_ddl_fin then
+        return index
+    end
+
+    local fin_index, err = pico.wait_ddl_finalize(index, { timeout = deadline - fiber.clock() })
+    if fin_index == nil then
+        return nil, err
+    end
+
+    return fin_index
+end
+
 help.drop_space = [[
 pico.drop_space(space, [opts])
 ======================
diff --git a/src/luamod.rs b/src/luamod.rs
index 4a8de8d77af62cceef10e17d7e59d76cd4683b4a..3df829452ba527524b60bf15544ec7ad0337029e 100644
--- a/src/luamod.rs
+++ b/src/luamod.rs
@@ -675,7 +675,7 @@ pub(crate) fn setup(args: &args::Run) {
 
         Params:
 
-            1. op (table)
+            1. op (table | string)
             2. index (number)
             3. timeout (number)
 
@@ -689,15 +689,26 @@ pub(crate) fn setup(args: &args::Run) {
             |lua: tlua::StaticLua| -> traft::Result<(RaftIndex, RaftTerm)> {
                 use tlua::{AnyLuaString, AsLua, LuaError, LuaTable};
 
-                let t: LuaTable<_> = (&lua).read_at(1).map_err(|(_, e)| LuaError::from(e))?;
-                // We do [lua value -> msgpack -> rust -> msgpack]
-                // instead of [lua value -> rust -> msgpack]
-                // because despite what it may seem this is much simpler.
-                // (The final [-> msgpack] is when we eventually do the rpc).
-                // The transmition medium is always msgpack.
-                let mp: AnyLuaString = lua
-                    .eval_with("return require 'msgpack'.encode(...)", &t)
-                    .map_err(LuaError::from)?;
+                let mp: AnyLuaString;
+                let t: Option<LuaTable<_>> = (&lua).read_at(1).ok();
+                if let Some(t) = t {
+                    // We do [lua value -> msgpack -> rust -> msgpack]
+                    // instead of [lua value -> rust -> msgpack]
+                    // because despite what it may seem this is much simpler.
+                    // (The final [-> msgpack] is when we eventually do the rpc).
+                    // The transmition medium is always msgpack.
+                    mp = lua
+                        .eval_with("return require 'msgpack'.encode(...)", &t)
+                        .map_err(LuaError::from)?;
+                } else {
+                    let s: Option<AnyLuaString> = (&lua).read_at(1).ok();
+                    if let Some(s) = s {
+                        mp = s;
+                    } else {
+                        return Err(Error::other("op should be a table or a string"));
+                    }
+                }
+
                 let op: Op = Decode::decode(mp.as_bytes())?;
                 if !op.is_schema_change() {
                     return Err(Error::other(
@@ -1195,107 +1206,75 @@ pub(crate) fn setup(args: &args::Run) {
 
     luamod_set(
         &l,
-        "create_space",
+        "_check_create_space_opts",
         indoc! {"
-        pico.create_space(opts)
-        =======================
-
-        Creates a space.
+        pico._check_create_space_opts(opts)
+        =================================
 
-        Returns when the space is created globally and becomes operable on the
-        current instance. Returns the index of the corresponding Op::DdlCommit
-        raft entry. It's necessary for syncing with other instances.
+        Internal API, see src/luamod.rs for the details.
 
         Params:
 
             1. opts (table)
-                - name (string)
-                - format (table {table SpaceField,...}), see pico.help('table SpaceField')
-                - primary_key (table {string,...}), with field names
-                - id (optional number), default: implicitly generated
-                - distribution (string), one of 'global' | 'sharded'
-                    in case it's sharded, either `by_field` (for explicit sharding)
-                    or `sharding_key`+`sharding_fn` (for implicit sharding) options
-                    must be supplied.
-                - by_field (optional string), usually 'bucket_id'
-                - sharding_key (optional table {string,...}) with field names
-                - sharding_fn (optional string), only default 'murmur3' is supported for now
-                - timeout (number), in seconds
 
         Returns:
 
-            (number)
+            (true)
             or
             (nil, string) in case of an error
+        "},
+        tlua::function1(|params: CreateSpaceParams| -> traft::Result<bool> {
+            params.validate()?;
+            Ok(true)
+        }),
+    );
+    luamod_set(
+        &l,
+        "_make_create_space_op_if_needed",
+        indoc! {"
+        pico._make_create_space_op_if_needed(opts)
+        =================================
 
-        Example:
-
-            -- Creates a global space 'friends_of_peppa' with two fields:
-            -- id (unsigned) and name (string).
-            pico.create_space({
-                name = 'friends_of_peppa',
-                format = {
-                    {name = 'id', type = 'unsigned', is_nullable = false},
-                    {name = 'name', type = 'string', is_nullable = false},
-                },
-                primary_key = {'id'},
-                distribution = 'global',
-                timeout = 3,
-            })
-
-            -- Global spaces are updated with compare-and-swap, see pico.help('cas')
-            pico.cas({
-                kind = 'insert',
-                space = 'friends_of_peppa',
-                tuple = {1, 'Suzy'},
-            })
-
-            -- Global spaces are read with Tarantool `box` API for now.
-            box.space.friends_of_peppa:fselect()
-
-            -- Creates an implicitly sharded space 'wonderland' with two fields:
-            -- property (string) and value (any).
-            pico.create_space({
-                name = 'wonderland',
-                format = {
-                    {name = 'property', type = 'string', is_nullable = false},
-                    {name = 'value', type = 'integer', is_nullable = true}
-                },
-                primary_key = {'property'},
-                distribution = 'sharded',
-                sharding_key = {'property'},
-                timeout = 3,
-            })
+        Internal API, see src/luamod.rs for the details.
 
-            -- Calculate an SQL-compatible hash for the bucket id.
-            local key = require('key_def').new({{fieldno = 1, type = 'string'}})
-            local tuple = box.tuple.new({'unicorns'})
-            local bucket_id = key:hash(tuple) % vshard.router.bucket_count()
+        Params:
 
-            -- Sharded spaces are updated via vshard api, see [1]
-            vshard.router.callrw(bucket_id, 'box.space.wonderland:insert', {{'unicorns', 12}})
+            1. opts (table), space create opts
 
-        See also:
+        Returns:
 
-            [1]: https://www.tarantool.io/en/doc/latest/reference/reference_rock/vshard/vshard_router/
+            (string) raft op encoded as msgpack
+            or
+            (nil) in case no operation is needed
+            or
+            (nil, string) in case of conflict
         "},
-        {
-            tlua::function1(|params: CreateSpaceParams| -> traft::Result<RaftIndex> {
-                let timeout = Duration::from_secs_f64(params.timeout);
+        tlua::function1(
+            |mut params: CreateSpaceParams| -> traft::Result<Option<tlua::AnyLuaString>> {
                 let storage = &node::global()?.storage;
-                let mut params = params.validate(storage)?;
-                params.test_create_space(storage)?;
-                let ddl = params.into_ddl(storage)?;
+                if params.space_exists()? {
+                    return Ok(None);
+                }
+                params.choose_id_if_not_specified()?;
+                params.test_create_space()?;
+                let ddl = params.into_ddl()?;
                 let schema_version = storage.properties.next_schema_version()?;
                 let op = Op::DdlPrepare {
                     schema_version,
                     ddl,
                 };
-                let index = schema::prepare_schema_change(op, timeout)?;
-                let commit_index = schema::wait_for_ddl_commit(index, timeout)?;
-                Ok(commit_index)
-            })
-        },
+                // FIXME: this is stupid, we serialize op into msgpack just to
+                // pass it via lua into another rust callback where it will be
+                // deserialized back, just to get serialized once again for
+                // the rpc (not to mention that inside the rpc it will once
+                // again be serialized to be put into the raft log). It is
+                // however much better than converting this op to a lua value
+                // and back. Anyway this shouldn't be a perf problem because
+                // this is not a very hot function.
+                let mp = rmp_serde::to_vec_named(&op).expect("raft op shouldn't fail to serialize");
+                Ok(Some(tlua::AnyLuaString(mp)))
+            },
+        ),
     );
     luamod_set(
         &l,
diff --git a/src/schema.rs b/src/schema.rs
index 1ad945e2655893a6a1c3b0ea9a808e60b490d3fb..ed718eac9b92b3761a4f19aa3ff54d3b74e0c260 100644
--- a/src/schema.rs
+++ b/src/schema.rs
@@ -276,7 +276,7 @@ pub fn try_space_field_type_to_index_field_type(
 
 #[derive(Debug, thiserror::Error)]
 pub enum DdlError {
-    #[error("space creation failed: {0}")]
+    #[error("{0}")]
     CreateSpace(#[from] CreateSpaceError),
     #[error("ddl operation was aborted")]
     Aborted,
@@ -286,10 +286,12 @@ pub enum DdlError {
 
 #[derive(Debug, thiserror::Error)]
 pub enum CreateSpaceError {
-    #[error("space with id {0} already exists")]
-    IdExists(SpaceId),
-    #[error("space with name {0} already exists")]
-    NameExists(String),
+    #[error("space with id {id} exists with a different name '{actual_name}', but expected '{expected_name}'")]
+    ExistsWithDifferentName {
+        id: SpaceId,
+        expected_name: String,
+        actual_name: String,
+    },
     #[error("several fields have the same name: {0}")]
     DuplicateFieldName(String),
     #[error("no field with name: {0}")]
@@ -351,16 +353,45 @@ pub struct CreateSpaceParams {
 }
 
 impl CreateSpaceParams {
-    pub fn validate(self, storage: &Clusterwide) -> traft::Result<ValidCreateSpaceParams> {
-        // Check that there is no space with this name
-        if storage.spaces.by_name(&self.name)?.is_some() {
-            return Err(CreateSpaceError::NameExists(self.name).into());
+    /// Checks if space described by options already exists. Returns an error if
+    /// the space with given id exists, but has a different name.
+    pub fn space_exists(&self) -> traft::Result<bool> {
+        // The check is performed using `box.space` API, so that local spaces are counted too.
+        let sys_space = Space::from(SystemSpace::Space);
+
+        let Some(id) = self.id else {
+            let sys_space_by_name = sys_space
+                .index_cached("name")
+                .expect("_space should have an index by name");
+            let t = sys_space_by_name
+                .get(&[&self.name])
+                .expect("reading from _space shouldn't fail");
+            return Ok(t.is_some());
+        };
+
+        let t = sys_space
+            .get(&[id])
+            .expect("reading from _space shouldn't fail");
+        let Some(t) = t else { return Ok(false); };
+
+        let existing_name: &str = t.get("name").expect("space metadata should contain a name");
+        if existing_name == self.name {
+            return Ok(true);
+        } else {
+            // TODO: check everything else is the same
+            // https://git.picodata.io/picodata/picodata/picodata/-/issues/331
+            return Err(CreateSpaceError::ExistsWithDifferentName {
+                id,
+                expected_name: self.name.clone(),
+                actual_name: existing_name.into(),
+            }
+            .into());
         }
-        // Check that there is no space with this id (if specified)
+    }
+
+    pub fn validate(&self) -> traft::Result<()> {
+        // Check space id fits in the allowed range
         if let Some(id) = self.id {
-            if storage.spaces.get(id)?.is_some() {
-                return Err(CreateSpaceError::IdExists(id).into());
-            }
             if id <= SPACE_ID_INTERNAL_MAX {
                 crate::tlog!(Warning, "requested space id {id} is in the range 0..={SPACE_ID_INTERNAL_MAX} reserved for future use by picodata, you may have a conflict in a future version");
             }
@@ -429,35 +460,28 @@ impl CreateSpaceParams {
                 );
             }
         }
-        Ok(ValidCreateSpaceParams(self))
+        Ok(())
     }
-}
-
-#[derive(Debug)]
-pub struct ValidCreateSpaceParams(CreateSpaceParams);
 
-impl ValidCreateSpaceParams {
     /// Create space and then rollback.
     ///
     /// Should be used for checking if a space with these params can be created.
-    pub fn test_create_space(&mut self, storage: &Clusterwide) -> traft::Result<()> {
-        let id = self.id(storage)?;
-        let params = &self.0;
+    pub fn test_create_space(&self) -> traft::Result<()> {
+        let id = self.id.expect("space id should've been chosen by now");
         let err = transaction(|| -> Result<(), Option<tarantool::error::Error>> {
             ::tarantool::schema::space::create_space(
-                &params.name,
+                &self.name,
                 &SpaceCreateOptions {
                     if_not_exists: false,
                     engine: SpaceEngineType::Memtx,
                     id: Some(id),
-                    field_count: params.format.len() as u32,
+                    field_count: self.format.len() as u32,
                     user: None,
                     is_local: false,
                     is_temporary: false,
                     is_sync: false,
                     format: Some(
-                        params
-                            .format
+                        self.format
                             .iter()
                             .cloned()
                             .map(tarantool::space::Field::from)
@@ -480,12 +504,11 @@ impl ValidCreateSpaceParams {
         }
     }
 
-    /// Memoizes id if it is automatically selected.
-    fn id(&mut self, storage: &Clusterwide) -> traft::Result<SpaceId> {
-        let _ = storage;
+    /// Chooses an id for the new space if it's not set yet and sets `self.id`.
+    pub fn choose_id_if_not_specified(&mut self) -> traft::Result<()> {
         let sys_space = Space::from(SystemSpace::Space);
 
-        let id = if let Some(id) = self.0.id {
+        let id = if let Some(id) = self.id {
             id
         } else {
             let mut id = SPACE_ID_INTERNAL_MAX;
@@ -513,38 +536,36 @@ impl ValidCreateSpaceParams {
             }
             id + 1
         };
-        self.0.id = Some(id);
-        Ok(id)
+        self.id = Some(id);
+        Ok(())
     }
 
-    pub fn into_ddl(mut self, storage: &Clusterwide) -> traft::Result<Ddl> {
-        let id = self.id(storage)?;
-        let primary_key: Vec<_> = self.0.primary_key.into_iter().map(Part::field).collect();
+    pub fn into_ddl(self) -> traft::Result<Ddl> {
+        let id = self.id.expect("space id should've been chosen by now");
+        let primary_key: Vec<_> = self.primary_key.into_iter().map(Part::field).collect();
         let format: Vec<_> = self
-            .0
             .format
             .into_iter()
             .map(tarantool::space::Field::from)
             .collect();
-        let distribution = match self.0.distribution {
+        let distribution = match self.distribution {
             DistributionParam::Global => Distribution::Global,
             DistributionParam::Sharded => {
-                if let Some(field) = self.0.by_field {
+                if let Some(field) = self.by_field {
                     Distribution::ShardedByField { field }
                 } else {
                     Distribution::ShardedImplicitly {
                         sharding_key: self
-                            .0
                             .sharding_key
                             .expect("should be checked during `validate`"),
-                        sharding_fn: self.0.sharding_fn.unwrap_or_default(),
+                        sharding_fn: self.sharding_fn.unwrap_or_default(),
                     }
                 }
             }
         };
         let res = Ddl::CreateSpace {
             id,
-            name: self.0.name,
+            name: self.name,
             format,
             primary_key,
             distribution,
@@ -684,9 +705,8 @@ mod tests {
 
     #[::tarantool::test]
     fn test_create_space() {
-        let storage = Clusterwide::new().unwrap();
-        ValidCreateSpaceParams(CreateSpaceParams {
-            id: None,
+        CreateSpaceParams {
+            id: Some(1337),
             name: "friends_of_peppa".into(),
             format: vec![
                 Field {
@@ -706,12 +726,12 @@ mod tests {
             sharding_key: None,
             sharding_fn: None,
             timeout: 0.0,
-        })
-        .test_create_space(&storage)
+        }
+        .test_create_space()
         .unwrap();
         assert!(tarantool::space::Space::find("friends_of_peppa").is_none());
 
-        let err = ValidCreateSpaceParams(CreateSpaceParams {
+        let err = CreateSpaceParams {
             id: Some(0),
             name: "friends_of_peppa".into(),
             format: vec![],
@@ -721,8 +741,8 @@ mod tests {
             sharding_key: None,
             sharding_fn: None,
             timeout: 0.0,
-        })
-        .test_create_space(&storage)
+        }
+        .test_create_space()
         .unwrap_err();
         assert_eq!(
             err.to_string(),
@@ -732,21 +752,6 @@ mod tests {
 
     #[::tarantool::test]
     fn ddl() {
-        let storage = Clusterwide::new().unwrap();
-        let existing_space = "existing_space";
-        let existing_id = 0;
-        storage
-            .spaces
-            .insert(&SpaceDef {
-                id: existing_id,
-                name: existing_space.into(),
-                distribution: Distribution::Global,
-                format: vec![],
-                schema_version: 0,
-                operable: true,
-            })
-            .unwrap();
-
         let new_space = "new_space";
         let new_id = 1;
         let field1 = Field {
@@ -760,42 +765,6 @@ mod tests {
             is_nullable: false,
         };
 
-        let err = CreateSpaceParams {
-            id: Some(existing_id),
-            name: new_space.into(),
-            format: vec![],
-            primary_key: vec![],
-            distribution: DistributionParam::Global,
-            by_field: None,
-            sharding_key: None,
-            sharding_fn: None,
-            timeout: 0.0,
-        }
-        .validate(&storage)
-        .unwrap_err();
-        assert_eq!(
-            err.to_string(),
-            "ddl failed: space creation failed: space with id 0 already exists"
-        );
-
-        let err = CreateSpaceParams {
-            id: Some(new_id),
-            name: existing_space.into(),
-            format: vec![],
-            primary_key: vec![],
-            distribution: DistributionParam::Global,
-            by_field: None,
-            sharding_key: None,
-            sharding_fn: None,
-            timeout: 0.0,
-        }
-        .validate(&storage)
-        .unwrap_err();
-        assert_eq!(
-            err.to_string(),
-            "ddl failed: space creation failed: space with name existing_space already exists"
-        );
-
         let err = CreateSpaceParams {
             id: Some(new_id),
             name: new_space.into(),
@@ -807,12 +776,9 @@ mod tests {
             sharding_fn: None,
             timeout: 0.0,
         }
-        .validate(&storage)
+        .validate()
         .unwrap_err();
-        assert_eq!(
-            err.to_string(),
-            "ddl failed: space creation failed: several fields have the same name: field1"
-        );
+        assert_eq!(err.to_string(), "several fields have the same name: field1");
 
         let err = CreateSpaceParams {
             id: Some(new_id),
@@ -825,12 +791,9 @@ mod tests {
             sharding_fn: None,
             timeout: 0.0,
         }
-        .validate(&storage)
+        .validate()
         .unwrap_err();
-        assert_eq!(
-            err.to_string(),
-            "ddl failed: space creation failed: no field with name: field2"
-        );
+        assert_eq!(err.to_string(), "no field with name: field2");
 
         let err = CreateSpaceParams {
             id: Some(new_id),
@@ -843,12 +806,9 @@ mod tests {
             sharding_fn: None,
             timeout: 0.0,
         }
-        .validate(&storage)
+        .validate()
         .unwrap_err();
-        assert_eq!(
-            err.to_string(),
-            "ddl failed: space creation failed: no field with name: field2"
-        );
+        assert_eq!(err.to_string(), "no field with name: field2");
 
         let err = CreateSpaceParams {
             id: Some(new_id),
@@ -861,12 +821,9 @@ mod tests {
             sharding_fn: None,
             timeout: 0.0,
         }
-        .validate(&storage)
+        .validate()
         .unwrap_err();
-        assert_eq!(
-            err.to_string(),
-            "ddl failed: space creation failed: several fields have the same name: field1"
-        );
+        assert_eq!(err.to_string(), "several fields have the same name: field1");
 
         let err = CreateSpaceParams {
             id: Some(new_id),
@@ -879,12 +836,9 @@ mod tests {
             sharding_fn: None,
             timeout: 0.0,
         }
-        .validate(&storage)
+        .validate()
         .unwrap_err();
-        assert_eq!(
-            err.to_string(),
-            "ddl failed: space creation failed: no field with name: field2"
-        );
+        assert_eq!(err.to_string(), "no field with name: field2");
 
         let err = CreateSpaceParams {
             id: Some(new_id),
@@ -897,11 +851,11 @@ mod tests {
             sharding_fn: None,
             timeout: 0.0,
         }
-        .validate(&storage)
+        .validate()
         .unwrap_err();
         assert_eq!(
             err.to_string(),
-            "ddl failed: space creation failed: distribution is `sharded`, but neither `by_field` nor `sharding_key` is set"
+            "distribution is `sharded`, but neither `by_field` nor `sharding_key` is set"
         );
 
         let err = CreateSpaceParams {
@@ -915,11 +869,11 @@ mod tests {
             sharding_fn: None,
             timeout: 0.0,
         }
-        .validate(&storage)
+        .validate()
         .unwrap_err();
         assert_eq!(
             err.to_string(),
-            "ddl failed: space creation failed: only one of sharding policy fields (`by_field`, `sharding_key`) should be set"
+            "only one of sharding policy fields (`by_field`, `sharding_key`) should be set"
         );
 
         CreateSpaceParams {
@@ -933,7 +887,7 @@ mod tests {
             sharding_fn: None,
             timeout: 0.0,
         }
-        .validate(&storage)
+        .validate()
         .unwrap();
     }
 }
diff --git a/src/sql.rs b/src/sql.rs
index 7f5fba821ec47201b9e784db83352f1f5f7c5e3c..cc0fd11c10b67b3be865e809b79d10ff1770a09c 100644
--- a/src/sql.rs
+++ b/src/sql.rs
@@ -69,7 +69,7 @@ pub fn dispatch_query(encoded_params: EncodedPatternWithParams) -> traft::Result
                                 is_nullable: f.is_nullable,
                             })
                             .collect();
-                        let params = CreateSpaceParams {
+                        let mut params = CreateSpaceParams {
                             id: None,
                             name,
                             format,
@@ -80,10 +80,14 @@ pub fn dispatch_query(encoded_params: EncodedPatternWithParams) -> traft::Result
                             sharding_fn: Some(ShardingFn::Murmur3),
                             timeout,
                         };
-                        let storage = &node::global()?.storage;
-                        let mut params = params.validate(storage)?;
-                        params.test_create_space(storage)?;
-                        params.into_ddl(storage)?
+                        params.validate()?;
+                        if params.space_exists()? {
+                            let result = ConsumerResult { row_count: 0 };
+                            return Ok(Tuple::new(&[result])?);
+                        }
+                        params.choose_id_if_not_specified()?;
+                        params.test_create_space()?;
+                        params.into_ddl()?
                     }
                     Ddl::DropTable { ref name, .. } => {
                         let space_def: SpaceDef =
diff --git a/src/traft/error.rs b/src/traft/error.rs
index 7915c205d8b5557f9d5c252b0985c1b5b8dc3fd8..d8aa2814a9e6aedc1a46e76fa427911af943f80b 100644
--- a/src/traft/error.rs
+++ b/src/traft/error.rs
@@ -67,7 +67,7 @@ pub enum Error {
 
     #[error("compare-and-swap: {0}")]
     Cas(#[from] crate::cas::Error),
-    #[error("ddl failed: {0}")]
+    #[error("{0}")]
     Ddl(#[from] crate::schema::DdlError),
 
     #[error("sbroad: {0}")]
diff --git a/test/int/test_ddl.py b/test/int/test_ddl.py
index 1e70c63f2c3b40135dc1358e90a481b6c30e5f70..ed6fafe999d96a9ea789ad2068927ad039b8bdcc 100644
--- a/test/int/test_ddl.py
+++ b/test/int/test_ddl.py
@@ -6,9 +6,8 @@ from conftest import Cluster, ReturnError
 def test_ddl_abort(cluster: Cluster):
     cluster.deploy(instance_count=2)
 
-    with pytest.raises(ReturnError) as e1:
+    with pytest.raises(ReturnError, match="there is no pending ddl operation"):
         cluster.abort_ddl()
-    assert e1.value.args == ("ddl failed: there is no pending ddl operation",)
 
     # TODO: test manual abort when we have long-running ddls
 
@@ -17,30 +16,50 @@ def test_ddl_abort(cluster: Cluster):
 def test_ddl_lua_api(cluster: Cluster):
     i1, i2 = cluster.deploy(instance_count=2)
 
+    #
+    # pico.create_space
+    #
+
     # Successful global space creation
-    space_id = 1026
     cluster.create_space(
         dict(
-            id=space_id,
+            id=1026,
             name="some_name",
             format=[dict(name="id", type="unsigned", is_nullable=False)],
             primary_key=["id"],
             distribution="global",
         )
     )
-    pico_space_def = [
-        space_id,
-        "some_name",
-        ["global"],
-        [["id", "unsigned", False]],
-        1,
-        True,
-    ]
-    assert i1.call("box.space._pico_space:get", space_id) == pico_space_def
-    assert i2.call("box.space._pico_space:get", space_id) == pico_space_def
 
-    # Space creation error
-    with pytest.raises(ReturnError) as e1:
+    # Called with the same args -> ok.
+    cluster.create_space(
+        dict(
+            id=1026,
+            name="some_name",
+            format=[dict(name="id", type="unsigned", is_nullable=False)],
+            primary_key=["id"],
+            distribution="global",
+        )
+    )
+
+    # FIXME: this should fail:
+    # see https://git.picodata.io/picodata/picodata/picodata/-/issues/331
+    # Called with same name/id but different format -> error.
+    cluster.create_space(
+        dict(
+            id=1026,
+            name="some_name",
+            format=[
+                dict(name="key", type="string", is_nullable=False),
+                dict(name="value", type="any", is_nullable=False),
+            ],
+            primary_key=["key"],
+            distribution="global",
+        )
+    )
+
+    # No such field for primary key -> error.
+    with pytest.raises(ReturnError, match="no field with name: not_defined"):
         cluster.create_space(
             dict(
                 id=1027,
@@ -50,9 +69,6 @@ def test_ddl_lua_api(cluster: Cluster):
                 distribution="global",
             )
         )
-    assert e1.value.args == (
-        "ddl failed: space creation failed: no field with name: not_defined",
-    )
 
     # Automatic space id
     cluster.create_space(