diff --git a/src/cas.rs b/src/cas.rs
index 19be462f3c8bb1bcfcb1baa3e4382e8c76daf6d5..4c4f5cde47e9ee568f57bdc51bdc69fc7cff5a82 100644
--- a/src/cas.rs
+++ b/src/cas.rs
@@ -26,8 +26,11 @@ use tarantool::tuple::{KeyDef, ToTupleBuffer, Tuple, TupleBuffer};
 
 use once_cell::sync::Lazy;
 
-const PROHIBITED_SPACES: &[ClusterwideSpaceId] =
-    &[ClusterwideSpaceId::Space, ClusterwideSpaceId::Index];
+const PROHIBITED_SPACES: &[ClusterwideSpaceId] = &[
+    ClusterwideSpaceId::Space,
+    ClusterwideSpaceId::Index,
+    ClusterwideSpaceId::User,
+];
 
 /// Performs a clusterwide compare and swap operation.
 ///
@@ -368,7 +371,7 @@ impl Predicate {
                         return Err(error());
                     }
                 }
-                Op::DdlPrepare { .. } | Op::DdlCommit | Op::DdlAbort => {
+                Op::DdlPrepare { .. } | Op::DdlCommit | Op::DdlAbort | Op::Acl { .. } => {
                     let key_def = storage.key_def_for_key(space, 0)?;
                     for key in ddl_keys.iter() {
                         if range.contains(&key_def, key) {
@@ -547,7 +550,7 @@ impl Bound {
 fn space(op: &Op) -> Option<SpaceId> {
     match op {
         Op::Dml(dml) => Some(dml.space()),
-        Op::DdlPrepare { .. } | Op::DdlCommit | Op::DdlAbort => {
+        Op::DdlPrepare { .. } | Op::DdlCommit | Op::DdlAbort | Op::Acl { .. } => {
             Some(ClusterwideSpaceId::Property.into())
         }
         Op::Nop => None,
diff --git a/src/schema.rs b/src/schema.rs
index b53a07b67cfe0147b393448c4f26f5a5a93a41f3..56fc4a264f45d57e1d19d0f370245a98058d95f4 100644
--- a/src/schema.rs
+++ b/src/schema.rs
@@ -25,6 +25,10 @@ use crate::traft::op::{Ddl, DdlBuilder, Op};
 use crate::traft::{self, event, node, RaftIndex};
 use crate::util::instant_saturating_add;
 
+////////////////////////////////////////////////////////////////////////////////
+// SpaceDef
+////////////////////////////////////////////////////////////////////////////////
+
 /// Space definition.
 ///
 /// Describes a user-defined space.
@@ -81,6 +85,10 @@ impl SpaceDef {
     }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// Distribution
+////////////////////////////////////////////////////////////////////////////////
+
 /// Defines how to distribute tuples in a space across replicasets.
 #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, LuaRead)]
 #[serde(rename_all = "snake_case")]
@@ -121,6 +129,10 @@ fn default_bucket_id_field() -> String {
     }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// IndexDef
+////////////////////////////////////////////////////////////////////////////////
+
 /// Index definition.
 ///
 /// Describes a user-defined index.
@@ -160,6 +172,46 @@ impl IndexDef {
     }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// UserDef
+////////////////////////////////////////////////////////////////////////////////
+
+/// User definition.
+///
+/// Describes a user-defined index.
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
+pub struct UserDef {
+    pub id: UserId,
+    pub name: String,
+    pub schema_version: u64,
+    pub auth: AuthDef,
+}
+
+pub type UserId = u32;
+
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
+pub struct AuthDef {
+    pub method: AuthMethod,
+    /// Base64 encoded digest.
+    pub data: String,
+}
+
+::tarantool::define_str_enum! {
+    pub enum AuthMethod {
+        ChapSha1 = "chap-sha1",
+    }
+}
+
+impl Encode for UserDef {}
+
+impl UserDef {
+    // TODO
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// ...
+////////////////////////////////////////////////////////////////////////////////
+
 // TODO: this should be a TryFrom in tarantool-module
 pub fn try_space_field_type_to_index_field_type(
     ft: tarantool::space::FieldType,
diff --git a/src/storage.rs b/src/storage.rs
index 19de71ba961f918d774084fb9247a8cf6e652531..8e04f898d7b2c6797647f4b9cfd6d5a7eb1ee7ae 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -3,6 +3,7 @@ use ::tarantool::index::{Index, IndexId, IndexIterator, IteratorType};
 use ::tarantool::msgpack::{ArrayWriter, ValueIter};
 use ::tarantool::space::UpdateOps;
 use ::tarantool::space::{FieldType, Space, SpaceId, SystemSpace};
+use ::tarantool::tlua::{self, LuaError};
 use ::tarantool::tuple::KeyDef;
 use ::tarantool::tuple::{Decode, DecodeOwned, Encode};
 use ::tarantool::tuple::{RawBytes, ToTupleBuffer, Tuple, TupleBuffer};
@@ -10,7 +11,7 @@ use ::tarantool::tuple::{RawBytes, ToTupleBuffer, Tuple, TupleBuffer};
 use crate::failure_domain as fd;
 use crate::instance::{self, grade, Instance};
 use crate::replicaset::{Replicaset, ReplicasetId};
-use crate::schema::{Distribution, IndexDef, SpaceDef};
+use crate::schema::{Distribution, IndexDef, SpaceDef, UserDef, UserId};
 use crate::tlog;
 use crate::traft;
 use crate::traft::error::Error;
@@ -475,6 +476,21 @@ define_clusterwide_spaces! {
             #[allow(clippy::enum_variant_names)]
             pub enum SpaceIndexIndex;
         }
+        User = 520, "_pico_user" => {
+            Clusterwide::users;
+
+            /// A struct for accessing info of all the user-defined users.
+            pub struct Users {
+                space: Space,
+                #[primary]
+                index_id: Index => Id = "id",
+                index_name: Index => Name = "name",
+            }
+
+            /// An enumeration of indexes defined for "_pico_user".
+            #[allow(clippy::enum_variant_names)]
+            pub enum SpaceUserIndex;
+        }
     }
 
     /// An index of a clusterwide space.
@@ -550,12 +566,17 @@ impl Clusterwide {
     pub fn apply_snapshot_data(&self, data: &SnapshotData, is_master: bool) -> Result<()> {
         debug_assert!(unsafe { ::tarantool::ffi::tarantool::box_txn() });
 
-        // We need to save these before truncating _pico_space.
+        // These need to be saved before we truncate the corresponding space.
         let mut old_space_versions: HashMap<SpaceId, u64> = HashMap::new();
         for space_def in self.spaces.iter()? {
             old_space_versions.insert(space_def.id, space_def.schema_version);
         }
 
+        let mut old_user_versions = HashMap::new();
+        for user_def in self.users.iter()? {
+            old_user_versions.insert(user_def.id, user_def.schema_version);
+        }
+
         let mut dont_exist_yet = Vec::new();
         for space_dump in &data.space_dumps {
             let space_name = &space_dump.space_name;
@@ -574,6 +595,7 @@ impl Clusterwide {
         // via tarantool replication.
         if is_master {
             self.apply_ddl_changes_on_master(&old_space_versions)?;
+            self.apply_acl_changes_on_master(&old_user_versions)?;
             set_local_schema_version(data.schema_version)?;
         }
 
@@ -670,6 +692,52 @@ impl Clusterwide {
         Ok(())
     }
 
+    pub fn apply_acl_changes_on_master(
+        &self,
+        old_user_versions: &HashMap<UserId, u64>,
+    ) -> traft::Result<()> {
+        let mut user_defs = Vec::new();
+        let mut new_user_ids = HashSet::new();
+        for user_def in self.users.iter()? {
+            new_user_ids.insert(user_def.id);
+            user_defs.push(user_def);
+        }
+
+        // First we drop all users which have been dropped.
+        for &old_user_id in old_user_versions.keys() {
+            if new_user_ids.contains(&old_user_id) {
+                // Will be handled later.
+                continue;
+            }
+
+            // User was dropped.
+            acl_drop_user_on_master(old_user_id)?;
+        }
+
+        // Now create any new users, or replace ones that changed.
+        for user_def in &user_defs {
+            let user_id = user_def.id;
+            if let Some(&v_old) = old_user_versions.get(&user_id) {
+                let v_new = user_def.schema_version;
+                assert!(v_old <= v_new);
+
+                if v_old == v_new {
+                    // User def is up to date.
+                    continue;
+                }
+
+                // User def changed, need to drop it and recreate.
+                acl_drop_user_on_master(user_id)?;
+            } else {
+                // New user.
+            }
+
+            acl_create_user_on_master(user_def)?;
+        }
+
+        Ok(())
+    }
+
     /// Return a `KeyDef` to be used for comparing **tuples** of the
     /// corresponding global space.
     pub(crate) fn key_def(
@@ -1845,6 +1913,154 @@ pub fn ddl_drop_space_on_master(space_id: SpaceId) -> traft::Result<Option<TntEr
     Ok(res.err())
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// Users
+////////////////////////////////////////////////////////////////////////////////
+
+impl Users {
+    pub fn new() -> tarantool::Result<Self> {
+        let space = Space::builder(Self::SPACE_NAME)
+            .id(Self::SPACE_ID)
+            .is_local(true)
+            .is_temporary(false)
+            .field(("id", FieldType::Unsigned))
+            .field(("name", FieldType::String))
+            .field(("schema_version", FieldType::Unsigned))
+            .field(("auth", FieldType::Array))
+            .if_not_exists(true)
+            .create()?;
+
+        let index_id = space
+            .index_builder(IndexOf::<Self>::Id.as_str())
+            .unique(true)
+            .part("id")
+            .if_not_exists(true)
+            .create()?;
+
+        let index_name = space
+            .index_builder(IndexOf::<Self>::Name.as_str())
+            .unique(true)
+            .part("name")
+            .if_not_exists(true)
+            .create()?;
+
+        Ok(Self {
+            space,
+            index_id,
+            index_name,
+        })
+    }
+
+    #[inline]
+    pub fn by_id(&self, user_id: UserId) -> tarantool::Result<Option<UserDef>> {
+        let tuple = self.space.get(&[user_id])?;
+        let mut res = None;
+        if let Some(tuple) = tuple {
+            res = Some(tuple.decode()?);
+        }
+        Ok(res)
+    }
+
+    #[inline]
+    pub fn by_name(&self, user_name: &str) -> tarantool::Result<Option<UserDef>> {
+        let tuple = self.index_name.get(&[user_name])?;
+        let mut res = None;
+        if let Some(tuple) = tuple {
+            res = Some(tuple.decode()?);
+        }
+        Ok(res)
+    }
+
+    #[inline]
+    pub fn replace(&self, user_def: &UserDef) -> tarantool::Result<()> {
+        self.space.replace(user_def)?;
+        Ok(())
+    }
+
+    #[inline]
+    pub fn insert(&self, user_def: &UserDef) -> tarantool::Result<()> {
+        self.space.insert(user_def)?;
+        Ok(())
+    }
+}
+
+impl ToEntryIter for Users {
+    type Entry = UserDef;
+
+    #[inline(always)]
+    fn index_iter(&self) -> Result<IndexIterator> {
+        Ok(self.space.select(IteratorType::All, &())?)
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// acl global
+////////////////////////////////////////////////////////////////////////////////
+
+/// Persist a user definition in the internal clusterwide storage.
+pub fn acl_global_create_user(storage: &Clusterwide, user_def: &UserDef) -> tarantool::Result<()> {
+    storage.users.insert(user_def)?;
+    Ok(())
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// acl
+////////////////////////////////////////////////////////////////////////////////
+
+/// Create a tarantool user. Grant it default privileges.
+pub fn acl_create_user_on_master(user_def: &UserDef) -> tarantool::Result<()> {
+    let sys_user = Space::from(SystemSpace::User);
+
+    // This impelemtation was copied from box.schema.user.create excluding the
+    // password hashing.
+    let user_id = user_def.id;
+    let euid = ::tarantool::session::euid()?;
+
+    // Tarantool expects auth info to be a map of form `{ method: data }`,
+    // and currently the simplest way to achieve this is to use a HashMap.
+    let auth_map = HashMap::from([(user_def.auth.method, &user_def.auth.data)]);
+    sys_user.insert(&(
+        user_id,
+        euid,
+        &user_def.name,
+        "user",
+        auth_map,
+        &[(); 0],
+        0,
+    ))?;
+
+    let lua = ::tarantool::lua_state();
+    lua.exec_with("box.schema.user.grant(...)", (user_id, "public"))
+        .map_err(LuaError::from)?;
+    lua.exec_with(
+        "box.schema.user.grant(...)",
+        (user_id, "alter", "user", user_id),
+    )
+    .map_err(LuaError::from)?;
+    lua.exec_with(
+        "box.session.su('admin', box.schema.user.grant, ...)",
+        (
+            user_id,
+            "session,usage",
+            "universe",
+            tlua::Nil,
+            tlua::AsTable((("if_not_exists", true),)),
+        ),
+    )
+    .map_err(LuaError::from)?;
+
+    Ok(())
+}
+
+/// Drop a tarantool user and any entities (spaces, etc.) owned by it.
+pub fn acl_drop_user_on_master(user_id: UserId) -> tarantool::Result<()> {
+    let lua = ::tarantool::lua_state();
+    lua.exec_with("box.schema.user.drop(...)", user_id)
+        .map_err(LuaError::from)?;
+
+    Ok(())
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // local schema version
 ////////////////////////////////////////////////////////////////////////////////
@@ -2111,7 +2327,8 @@ mod tests {
         let snapshot_data = Clusterwide::snapshot_data().unwrap();
         let space_dumps = snapshot_data.space_dumps;
 
-        assert_eq!(space_dumps.len(), 6);
+        let n_internal_spaces = ClusterwideSpace::values().len();
+        assert_eq!(space_dumps.len(), n_internal_spaces);
 
         for space_dump in &space_dumps {
             match &space_dump.space_name {
@@ -2150,8 +2367,12 @@ mod tests {
                     let []: [(); 0] = Decode::decode(space_dump.tuples.as_ref()).unwrap();
                 }
 
-                _ => {
-                    unreachable!();
+                s if s == &*ClusterwideSpace::User => {
+                    let []: [(); 0] = Decode::decode(space_dump.tuples.as_ref()).unwrap();
+                }
+
+                s => {
+                    unreachable!("space dump for space '{s}'");
                 }
             }
         }
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 7068061bba4b5b17278225d73e0fa212fb711df1..aeb270efc208289d89f58cb23a8747f81a479289 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -13,11 +13,14 @@ use crate::loop_start;
 use crate::r#loop::FlowControl;
 use crate::rpc;
 use crate::schema::{Distribution, IndexDef, SpaceDef};
+use crate::storage::acl_global_create_user;
 use crate::storage::ddl_meta_drop_space;
-use crate::storage::local_schema_version;
 use crate::storage::SnapshotData;
 use crate::storage::ToEntryIter as _;
+use crate::storage::ToEntryIter as _;
+use crate::storage::{acl_create_user_on_master, acl_drop_user_on_master};
 use crate::storage::{ddl_abort_on_master, ddl_meta_space_update_operable};
+use crate::storage::{local_schema_version, set_local_schema_version};
 use crate::storage::{Clusterwide, ClusterwideSpaceId, PropertyName};
 use crate::stringify_cfunc;
 use crate::sync;
@@ -27,7 +30,7 @@ use crate::traft::error::Error;
 use crate::traft::event;
 use crate::traft::event::Event;
 use crate::traft::notify::{notification, Notifier, Notify};
-use crate::traft::op::{Ddl, Dml, Op, OpResult};
+use crate::traft::op::{Acl, Ddl, Dml, Op, OpResult};
 use crate::traft::Address;
 use crate::traft::ConnectionPool;
 use crate::traft::ContextCoercion as _;
@@ -969,6 +972,34 @@ impl NodeImpl {
                     .delete(PropertyName::PendingSchemaVersion)
                     .expect("storage error");
             }
+
+            Op::Acl {
+                schema_version,
+                acl: Acl::CreateUser { user_def },
+            } => {
+                let v_local = local_schema_version().expect("storage error");
+                let v_pending = schema_version;
+                if v_local < v_pending {
+                    if self.is_readonly() {
+                        // Wait for tarantool replication with master to progress.
+                        return SleepAndRetry;
+                    } else {
+                        assert_eq!(user_def.schema_version, v_pending);
+                        acl_create_user_on_master(&user_def).expect("creating user shouldn't fail");
+                        set_local_schema_version(v_pending).expect("storage error");
+                    }
+                }
+
+                acl_global_create_user(&self.storage, &user_def)
+                    .expect("persisting a user definition shouldn't fail");
+
+                storage_properties
+                    .put(PropertyName::GlobalSchemaVersion, &v_pending)
+                    .expect("storage error");
+                storage_properties
+                    .put(PropertyName::NextSchemaVersion, &(v_pending + 1))
+                    .expect("storage error");
+            }
         }
 
         if let Some(lc) = &lc {
diff --git a/src/traft/op.rs b/src/traft/op.rs
index 4a65e4f43852fd3d745a453042dadc870e9b517b..36682223fea8e19b87c41bb3e74a99e22e19f70a 100644
--- a/src/traft/op.rs
+++ b/src/traft/op.rs
@@ -1,4 +1,4 @@
-use crate::schema::Distribution;
+use crate::schema::{Distribution, UserDef};
 use crate::storage::space_by_name;
 use crate::storage::Clusterwide;
 use ::tarantool::index::{IndexId, Part};
@@ -45,6 +45,8 @@ pub enum Op {
     ///
     /// Only one pending DDL operation can exist at the same time.
     DdlAbort,
+    /// Cluster-wide access control list change operation.
+    Acl { schema_version: u64, acl: Acl },
 }
 
 impl std::fmt::Display for Op {
@@ -109,6 +111,16 @@ impl std::fmt::Display for Op {
             }
             Self::DdlCommit => write!(f, "DdlCommit"),
             Self::DdlAbort => write!(f, "DdlAbort"),
+            Self::Acl {
+                schema_version,
+                acl: Acl::CreateUser { user_def },
+            } => {
+                write!(
+                    f,
+                    r#"CreateUser({schema_version}, {}, "{}")"#,
+                    user_def.id, user_def.name,
+                )
+            }
         };
 
         struct DisplayAsJson<T>(pub T);
@@ -395,6 +407,18 @@ impl DdlBuilder {
     }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// Acl
+////////////////////////////////////////////////////////////////////////////////
+
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
+#[serde(rename_all = "snake_case")]
+#[serde(tag = "op_kind")]
+pub enum Acl {
+    /// Create a tarantool user. Grant it default privileges.
+    CreateUser { user_def: UserDef },
+}
+
 mod vec_of_raw_byte_buf {
     use super::TupleBuffer;
     use serde::de::Error as _;
diff --git a/test/int/test_acl.py b/test/int/test_acl.py
new file mode 100644
index 0000000000000000000000000000000000000000..6186ae0b1a0d159057cf834348c24d1d007a2836
--- /dev/null
+++ b/test/int/test_acl.py
@@ -0,0 +1,38 @@
+from conftest import Cluster, Instance
+
+
+def propose_create_user(
+    instance: Instance,
+    id: int,
+    name: str,
+    password: str,
+    wait_index: bool = True,
+    timeout: int = 3,
+) -> int:
+    digest = instance.call("box.internal.prepare_auth", "chap-sha1", password)
+    schema_version = instance.next_schema_version()
+    op = dict(
+        kind="acl",
+        op_kind="create_user",
+        user_def=dict(
+            id=id,
+            name=name,
+            auth=dict(type="chap-sha1", digest=digest),
+            schema_version=schema_version,
+        ),
+    )
+    # TODO: use pico.cas
+    return instance.call("pico.raft_propose", op, timeout=timeout)
+
+
+def test_acl_create_user_basic(cluster: Cluster):
+    i1, *_ = cluster.deploy(instance_count=4, init_replication_factor=2)
+
+    username = "Bobby"
+    index = propose_create_user(i1, id=314, name=username, password="s3cr3t")
+
+    for i in cluster.instances:
+        i.raft_wait_index(index)
+
+    for i in cluster.instances:
+        assert i.call("box.space._user.index.name:get", username) is not None