diff --git a/src/cas.rs b/src/cas.rs
index 21c654daff2e3d0fc55e36c76b01f661e42ed6ea..5a2ab9c38cde0b9a99798e1bfd2afdec7282a598 100644
--- a/src/cas.rs
+++ b/src/cas.rs
@@ -33,6 +33,7 @@ const PROHIBITED_SPACES: &[ClusterwideSpaceId] = &[
     ClusterwideSpaceId::Space,
     ClusterwideSpaceId::Index,
     ClusterwideSpaceId::User,
+    ClusterwideSpaceId::Role,
     ClusterwideSpaceId::Privilege,
 ];
 
diff --git a/src/luamod.lua b/src/luamod.lua
index 8192d71934d1388e081b384b56d73753e9727bd1..027c67758c83688436b0f3952fd055a1c89dff5c 100644
--- a/src/luamod.lua
+++ b/src/luamod.lua
@@ -44,12 +44,23 @@ function pico.help(topic)
     end
 end
 
-local function get_next_user_id()
-    -- TODO: if user id overflows start filling the holes
-    local user = box.space._pico_user.index[0]:max()
-    if user ~= nil then
-        return user.id + 1
+-- Get next id unoccupied by a user or a role. Tarantool stores both users and
+-- roles in the same space, so they share the same set of ids.
+local function get_next_grantee_id()
+    -- TODO: if id overflows start filling the holes
+    local max_user = box.space._pico_user.index[0]:max()
+    local max_role = box.space._pico_role.index[0]:max()
+    local new_id = 0
+    if max_user then
+        new_id = max_user.id + 1
+    end
+    if max_role and new_id <= max_role.id then
+        new_id = max_role.id + 1
+    end
+    if new_id ~= 0 then
+        return new_id
     else
+        -- There are always builtin tarantool users
         local tt_user = box.space._user.index[0]:max()
         return tt_user.id + 1
     end
@@ -120,7 +131,7 @@ function pico.create_user(user, password, opts)
         kind = 'acl',
         op_kind = 'create_user',
         user_def = {
-            id = get_next_user_id(),
+            id = get_next_grantee_id(),
             name = user,
             schema_version = next_schema_version(),
             auth = {
@@ -240,6 +251,105 @@ function pico.drop_user(user, password, opts)
     return pico._prepare_schema_change(op, opts.timeout or 3)
 end
 
+help.create_role = [[
+pico.create_role(name, [opts])
+========================================
+
+Creates a role on each instance of the cluster.
+
+Proposes a raft entry which when applied on an instance creates a role on it.
+On success returns an index of the corresponding raft entry.
+
+Params:
+
+    1. name (string), role name
+    2. opts (table)
+        - if_not_exists (boolean), if true, do nothing if role with given name already exists
+        - timeout (number), wait for this many seconds for the proposed entry
+            to be applied locally, default: 3 seconds
+
+Returns:
+
+    (number) raft index
+    or
+    (nil, error) in case of an error
+]]
+function pico.create_role(role, opts)
+    box.internal.check_param_table(opts, { if_not_exists = 'boolean', timeout = 'number' })
+    opts = opts or {}
+
+    local grantee_def = box.space._user.index.name:get(role)
+    if grantee_def ~= nil then
+        if grantee_def.type == "user" then
+            return nil, box.error.new(box.error.USER_EXISTS, role)
+        elseif opts.if_not_exists == true then
+            -- Role exists at current index.
+            return pico.raft_get_index()
+        else
+            return nil, box.error.new(box.error.ROLE_EXISTS, role)
+        end
+    end
+
+    local op = {
+        kind = 'acl',
+        op_kind = 'create_role',
+        role_def = {
+            id = get_next_grantee_id(),
+            name = role,
+            schema_version = next_schema_version(),
+        }
+    }
+
+    return pico._prepare_schema_change(op, opts.timeout or 3)
+end
+
+help.drop_role = [[
+pico.drop_role(role, [opts])
+========================================
+
+Drop the role and any entities owned by them on each instance of the cluster.
+
+Proposes a raft entry which when applied on an instance drops the role on it.
+On success returns a raft index at which the role should no longer exist.
+
+Params:
+
+    1. role (string), role name
+    2. opts (table)
+        - if_exists (boolean), if true do nothing if role with given name doesn't exist
+        - timeout (number), wait for this many seconds for the proposed entry
+            to be applied locally, default: 3 seconds
+
+Returns:
+
+    (number) raft index
+    or
+    (nil, error) in case of an error
+]]
+function pico.drop_role(role, password, opts)
+    box.internal.check_param_table(opts, { if_exists = 'boolean', timeout = 'number' })
+    opts = opts or {}
+
+    local role_def = box.space._pico_role.index.name:get(role)
+    if role_def == nil then
+        if opts.if_exists then
+            -- Role doesn't exist at current index
+            return pico.raft_get_index()
+        else
+            return nil, box.error.new(box.error.NO_SUCH_ROLE, role)
+        end
+    end
+
+    local op = {
+        kind = 'acl',
+        op_kind = 'drop_role',
+        role_id = role_def.id,
+        schema_version = next_schema_version(),
+    }
+
+    return pico._prepare_schema_change(op, opts.timeout or 3)
+end
+
 -- A lookup map
 local supported_priveleges = {
     read = true,
@@ -401,12 +511,12 @@ local function object_resolve(object_type, object_name)
 end
 
 help.grant_privilege = [[
-pico.grant_privilege(user, privilege, object_type, [object_name], [opts])
+pico.grant_privilege(grantee, privilege, object_type, [object_name], [opts])
 ========================================
 
-Grant the user some privilege on each instance of the cluster.
+Grant some privilege to a user or role on each instance of the cluster.
 
-Proposes a raft entry which when applied on an instance grants the user the
+Proposes a raft entry which when applied on an instance grants the grantee the
 specified privilege on it.
 On success returns an index of the corresponding raft entry.
 
@@ -416,7 +526,7 @@ even if the subsequent call to this function returns an "already granted" error.
 
 Params:
 
-    1. user (string), username
+    1. grantee (string), name of user or role
 
     2. privilege (string), one of
         'read' | 'write' | 'execute' | 'session' | 'usage' | 'create' | 'drop' |
@@ -449,15 +559,23 @@ Examples:
     -- Grant user 'Dave' privilege to create new users.
     pico.grant_privilege('Dave', 'create', 'user')
 
+    -- Grant write access to space 'Junk' for role 'Maintainer'.
+    pico.grant_privilege('Maintainer', 'write', 'space', 'Junk')
+
+    -- Assign role 'Maintainer' to user 'Dave'.
+    pico.grant_privilege('Dave', 'execute', 'role', 'Maintainer')
 ]]
-function pico.grant_privilege(user, privilege, object_type, object_name, opts)
+function pico.grant_privilege(grantee, privilege, object_type, object_name, opts)
     box.internal.check_param_table(opts, { timeout = 'number' })
     opts = opts or {}
     object_name = object_name or ''
 
-    local user_def = box.space._pico_user.index.name:get(user)
-    if user_def == nil then
-        return nil, box.error.new(box.error.NO_SUCH_USER, user)
+    local grantee_def = box.space._pico_user.index.name:get(grantee)
+    if grantee_def == nil then
+        grantee_def = box.space._pico_role.index.name:get(grantee)
+    end
+    if grantee_def == nil then
+        return nil, box.error.new(box.error.NO_SUCH_USER, grantee)
     end
 
     local ok, err = pcall(privilege_check, privilege, object_type, 'grant_privilege')
@@ -470,15 +588,15 @@ function pico.grant_privilege(user, privilege, object_type, object_name, opts)
         return nil, err
     end
 
-    if box.space._pico_privilege:get{user_def.id, object_type, object_name, privilege} ~= nil then
-        return nil, box.error.new(box.error.PRIV_GRANTED, user, privilege, object_type, object_name)
+    if box.space._pico_privilege:get{grantee_def.id, object_type, object_name, privilege} ~= nil then
+        return nil, box.error.new(box.error.PRIV_GRANTED, grantee, privilege, object_type, object_name)
     end
 
     local op = {
         kind = 'acl',
         op_kind = 'grant_privilege',
         priv_def = {
-            user_id = user_def.id,
+            grantee_id = grantee_def.id,
             object_type = object_type,
             object_name = object_name,
             privilege = privilege,
@@ -490,13 +608,13 @@ function pico.grant_privilege(user, privilege, object_type, object_name, opts)
 end
 
 help.revoke_privilege = [[
-pico.revoke_privilege(user, privilege, object_type, [object_name], [opts])
+pico.revoke_privilege(grantee, privilege, object_type, [object_name], [opts])
 ========================================
 
-Revoke some privilege from the user on each instance of the cluster.
+Revoke some privilege from the user or role on each instance of the cluster.
 
 Proposes a raft entry which when applied on an instance revokes the specified
-privilege from the user on it.
+privilege from the grantee on it.
 On success returns an index of the corresponding raft entry.
 
 NOTE: If this function returns a timeout error, the change may have been locally
@@ -505,7 +623,7 @@ even if the subsequent call to this function returns an "not granted" error.
 
 Params:
 
-    1. user (string), username
+    1. grantee (string), name of user or role
 
     2. privilege (string), one of
         'read' | 'write' | 'execute' | 'session' | 'usage' | 'create' | 'drop' |
@@ -527,14 +645,17 @@ Returns:
     or
     (nil, error) in case of an error
 ]]
-function pico.revoke_privilege(user, privilege, object_type, object_name, opts)
+function pico.revoke_privilege(grantee, privilege, object_type, object_name, opts)
     box.internal.check_param_table(opts, { timeout = 'number' })
     opts = opts or {}
     object_name = object_name or ''
 
-    local user_def = box.space._pico_user.index.name:get(user)
-    if user_def == nil then
-        return nil, box.error.new(box.error.NO_SUCH_USER, user)
+    local grantee_def = box.space._pico_user.index.name:get(grantee)
+    if grantee_def == nil then
+        grantee_def = box.space._pico_role.index.name:get(grantee)
+    end
+    if grantee_def == nil then
+        return nil, box.error.new(box.error.NO_SUCH_USER, grantee)
     end
 
     local ok, err = pcall(privilege_check, privilege, object_type, 'revoke_privilege')
@@ -547,15 +668,15 @@ function pico.revoke_privilege(user, privilege, object_type, object_name, opts)
         return nil, err
     end
 
-    if box.space._pico_privilege:get{user_def.id, object_type, object_name, privilege} == nil then
-        return nil, box.error.new(box.error.PRIV_NOT_GRANTED, user, privilege, object_type, object_name)
+    if box.space._pico_privilege:get{grantee_def.id, object_type, object_name, privilege} == nil then
+        return nil, box.error.new(box.error.PRIV_NOT_GRANTED, grantee, privilege, object_type, object_name)
     end
 
     local op = {
         kind = 'acl',
         op_kind = 'revoke_privilege',
         priv_def = {
-            user_id = user_def.id,
+            grantee_id = grantee_def.id,
             object_type = object_type,
             object_name = object_name,
             privilege = privilege,
diff --git a/src/schema.rs b/src/schema.rs
index 7fedf8ebab4aaf435985598bce3a455b9e6a65f9..cf40b4ea3dfe99347306e8f11bd934cf7652221a 100644
--- a/src/schema.rs
+++ b/src/schema.rs
@@ -206,6 +206,20 @@ impl UserDef {
     pub const FIELD_AUTH: usize = 3;
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// RoleDef
+////////////////////////////////////////////////////////////////////////////////
+
+/// Role definition.
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
+pub struct RoleDef {
+    pub id: UserId,
+    pub name: String,
+    pub schema_version: u64,
+}
+
+impl Encode for RoleDef {}
+
 ////////////////////////////////////////////////////////////////////////////////
 // PrivilegeDef
 ////////////////////////////////////////////////////////////////////////////////
@@ -213,7 +227,11 @@ impl UserDef {
 /// Privilege definition.
 #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
 pub struct PrivilegeDef {
-    pub user_id: UserId,
+    /// Id of the user or role to whom the privilege is granted.
+    ///
+    /// In tarantool users and roles are stored in the same space, which means a
+    /// role and a user cannot have the same id or name.
+    pub grantee_id: UserId,
     pub object_type: String,
     pub object_name: String,
     pub privilege: String,
diff --git a/src/storage.rs b/src/storage.rs
index e0066139a71cc16921e8852daf63a10edcf76a29..b3604b84c183ce2086462ad24e73ec172f106d56 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -12,7 +12,9 @@ use ::tarantool::tuple::{RawBytes, ToTupleBuffer, Tuple, TupleBuffer};
 use crate::failure_domain as fd;
 use crate::instance::{self, grade, Instance};
 use crate::replicaset::{Replicaset, ReplicasetId};
-use crate::schema::{AuthDef, Distribution, IndexDef, PrivilegeDef, SpaceDef, UserDef, UserId};
+use crate::schema::{
+    AuthDef, Distribution, IndexDef, PrivilegeDef, RoleDef, SpaceDef, UserDef, UserId,
+};
 use crate::tlog;
 use crate::traft;
 use crate::traft::error::Error;
@@ -492,6 +494,21 @@ define_clusterwide_spaces! {
             #[allow(clippy::enum_variant_names)]
             pub enum SpaceUserIndex;
         }
+        Role = 522, "_pico_role" => {
+            Clusterwide::roles;
+
+            /// A struct for accessing info of all the user-defined roles.
+            pub struct Roles {
+                space: Space,
+                #[primary]
+                index_id: Index => Id = "id",
+                index_name: Index => Name = "name",
+            }
+
+            /// An enumeration of indexes defined for "_pico_role".
+            #[allow(clippy::enum_variant_names)]
+            pub enum SpaceRoleIndex;
+        }
         Privilege = 521, "_pico_privilege" => {
             Clusterwide::privileges;
 
@@ -588,6 +605,7 @@ impl Clusterwide {
         // These need to be saved before we truncate the corresponding spaces.
         let mut old_space_versions = HashMap::new();
         let mut old_user_versions = HashMap::new();
+        let mut old_role_versions = HashMap::new();
         let mut old_priv_versions = HashMap::new();
 
         for def in self.spaces.iter()? {
@@ -596,6 +614,9 @@ impl Clusterwide {
         for def in self.users.iter()? {
             old_user_versions.insert(def.id, def.schema_version);
         }
+        for def in self.roles.iter()? {
+            old_role_versions.insert(def.id, def.schema_version);
+        }
         for def in self.privileges.iter()? {
             let schema_version = def.schema_version;
             old_priv_versions.insert(def, schema_version);
@@ -621,6 +642,7 @@ impl Clusterwide {
             self.apply_schema_changes_on_master(self.spaces.iter()?, &old_space_versions)?;
             // TODO: secondary indexes
             self.apply_schema_changes_on_master(self.users.iter()?, &old_user_versions)?;
+            self.apply_schema_changes_on_master(self.roles.iter()?, &old_role_versions)?;
             self.apply_schema_changes_on_master(self.privileges.iter()?, &old_priv_versions)?;
             set_local_schema_version(data.schema_version)?;
         }
@@ -1984,6 +2006,91 @@ impl ToEntryIter for Users {
     }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// Roles
+////////////////////////////////////////////////////////////////////////////////
+
+impl Roles {
+    pub fn new() -> tarantool::Result<Self> {
+        let space = Space::builder(Self::SPACE_NAME)
+            .id(Self::SPACE_ID)
+            .is_local(true)
+            .is_temporary(false)
+            .field(("id", FieldType::Unsigned))
+            .field(("name", FieldType::String))
+            .field(("schema_version", FieldType::Unsigned))
+            .if_not_exists(true)
+            .create()?;
+
+        let index_id = space
+            .index_builder(IndexOf::<Self>::Id.as_str())
+            .unique(true)
+            .part("id")
+            .if_not_exists(true)
+            .create()?;
+
+        let index_name = space
+            .index_builder(IndexOf::<Self>::Name.as_str())
+            .unique(true)
+            .part("name")
+            .if_not_exists(true)
+            .create()?;
+
+        Ok(Self {
+            space,
+            index_id,
+            index_name,
+        })
+    }
+
+    #[inline]
+    pub fn by_id(&self, role_id: UserId) -> tarantool::Result<Option<RoleDef>> {
+        let tuple = self.space.get(&[role_id])?;
+        let mut res = None;
+        if let Some(tuple) = tuple {
+            res = Some(tuple.decode()?);
+        }
+        Ok(res)
+    }
+
+    #[inline]
+    pub fn by_name(&self, role_name: &str) -> tarantool::Result<Option<RoleDef>> {
+        let tuple = self.index_name.get(&[role_name])?;
+        let mut res = None;
+        if let Some(tuple) = tuple {
+            res = Some(tuple.decode()?);
+        }
+        Ok(res)
+    }
+
+    #[inline]
+    pub fn replace(&self, role_def: &RoleDef) -> tarantool::Result<()> {
+        self.space.replace(role_def)?;
+        Ok(())
+    }
+
+    #[inline]
+    pub fn insert(&self, role_def: &RoleDef) -> tarantool::Result<()> {
+        self.space.insert(role_def)?;
+        Ok(())
+    }
+
+    #[inline]
+    pub fn delete(&self, role_id: UserId) -> tarantool::Result<()> {
+        self.space.delete(&[role_id])?;
+        Ok(())
+    }
+}
+
+impl ToEntryIter for Roles {
+    type Entry = RoleDef;
+
+    #[inline(always)]
+    fn index_iter(&self) -> Result<IndexIterator> {
+        Ok(self.space.select(IteratorType::All, &())?)
+    }
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // Privileges
 ////////////////////////////////////////////////////////////////////////////////
@@ -1994,7 +2101,7 @@ impl Privileges {
             .id(Self::SPACE_ID)
             .is_local(true)
             .is_temporary(false)
-            .field(("user_id", FieldType::Unsigned))
+            .field(("grantee_id", FieldType::Unsigned))
             .field(("object_type", FieldType::String))
             .field(("object_name", FieldType::String))
             .field(("privilege", FieldType::String))
@@ -2005,7 +2112,7 @@ impl Privileges {
         let primary_key = space
             .index_builder(IndexOf::<Self>::Primary.as_str())
             .unique(true)
-            .parts(["user_id", "object_type", "object_name", "privilege"])
+            .parts(["grantee_id", "object_type", "object_name", "privilege"])
             .if_not_exists(true)
             .create()?;
 
@@ -2015,11 +2122,11 @@ impl Privileges {
     #[inline(always)]
     pub fn get(
         &self,
-        user_id: UserId,
+        grantee_id: UserId,
         object_type: &str,
         object_name: &str,
     ) -> tarantool::Result<Option<PrivilegeDef>> {
-        let tuple = self.space.get(&(user_id, object_type, object_name))?;
+        let tuple = self.space.get(&(grantee_id, object_type, object_name))?;
         let mut res = None;
         if let Some(tuple) = tuple {
             res = Some(tuple.decode()?);
@@ -2028,8 +2135,8 @@ impl Privileges {
     }
 
     #[inline(always)]
-    pub fn by_user_id(&self, user_id: UserId) -> tarantool::Result<EntryIter<PrivilegeDef>> {
-        let iter = self.primary_key.select(IteratorType::Eq, &[user_id])?;
+    pub fn by_grantee_id(&self, grantee_id: UserId) -> tarantool::Result<EntryIter<PrivilegeDef>> {
+        let iter = self.primary_key.select(IteratorType::Eq, &[grantee_id])?;
         Ok(EntryIter::new(iter))
     }
 
@@ -2048,21 +2155,22 @@ impl Privileges {
     #[inline(always)]
     pub fn delete(
         &self,
-        user_id: UserId,
+        grantee_id: UserId,
         object_type: &str,
         object_name: &str,
         privilege: &str,
     ) -> tarantool::Result<()> {
         self.space
-            .delete(&(user_id, object_type, object_name, privilege))?;
+            .delete(&(grantee_id, object_type, object_name, privilege))?;
         Ok(())
     }
 
-    #[inline(always)]
-    pub fn delete_all_by_user_id(&self, user_id: UserId) -> tarantool::Result<()> {
-        for priv_def in self.by_user_id(user_id)? {
+    /// Remove any privilege definitions granted to the given grantee.
+    #[inline]
+    pub fn delete_all_by_grantee_id(&self, grantee_id: UserId) -> tarantool::Result<()> {
+        for priv_def in self.by_grantee_id(grantee_id)? {
             self.delete(
-                priv_def.user_id,
+                priv_def.grantee_id,
                 &priv_def.object_type,
                 &priv_def.object_name,
                 &priv_def.privilege,
@@ -2070,6 +2178,25 @@ impl Privileges {
         }
         Ok(())
     }
+
+    /// Remove any privilege definitions assigning the given role.
+    #[inline]
+    pub fn delete_all_by_granted_role(&self, role_name: &str) -> Result<()> {
+        for priv_def in self.iter()? {
+            if priv_def.privilege == "execute"
+                && priv_def.object_type == "role"
+                && priv_def.object_name == role_name
+            {
+                self.delete(
+                    priv_def.grantee_id,
+                    &priv_def.object_type,
+                    &priv_def.object_name,
+                    &priv_def.privilege,
+                )?;
+            }
+        }
+        Ok(())
+    }
 }
 
 impl ToEntryIter for Privileges {
@@ -2192,6 +2319,34 @@ impl SchemaDef for UserDef {
     }
 }
 
+impl SchemaDef for RoleDef {
+    type Key = UserId;
+
+    #[inline(always)]
+    fn key(&self) -> UserId {
+        self.id
+    }
+
+    #[inline(always)]
+    fn schema_version(&self) -> u64 {
+        self.schema_version
+    }
+
+    #[inline(always)]
+    fn on_insert(&self, storage: &Clusterwide) -> traft::Result<()> {
+        _ = storage;
+        acl_create_role_on_master(self)?;
+        Ok(())
+    }
+
+    #[inline(always)]
+    fn on_delete(user_id: &UserId, storage: &Clusterwide) -> traft::Result<()> {
+        _ = storage;
+        acl_drop_role_on_master(*user_id)?;
+        Ok(())
+    }
+}
+
 impl SchemaDef for PrivilegeDef {
     type Key = Self;
 
@@ -2244,11 +2399,31 @@ pub fn acl_global_change_user_auth(
 /// Remove a user definition and any entities owned by it from the internal
 /// clusterwide storage.
 pub fn acl_global_drop_user(storage: &Clusterwide, user_id: UserId) -> tarantool::Result<()> {
-    storage.privileges.delete_all_by_user_id(user_id)?;
+    storage.privileges.delete_all_by_grantee_id(user_id)?;
     storage.users.delete(user_id)?;
     Ok(())
 }
 
+/// Persist a role definition in the internal clusterwide storage.
+pub fn acl_global_create_role(storage: &Clusterwide, role_def: &RoleDef) -> tarantool::Result<()> {
+    storage.roles.insert(role_def)?;
+    Ok(())
+}
+
+/// Remove a role definition and any entities owned by it from the internal
+/// clusterwide storage.
+pub fn acl_global_drop_role(storage: &Clusterwide, role_id: UserId) -> Result<()> {
+    storage.privileges.delete_all_by_grantee_id(role_id)?;
+    if let Some(role_def) = storage.roles.by_id(role_id)? {
+        // Revoke the role from any grantees.
+        storage
+            .privileges
+            .delete_all_by_granted_role(&role_def.name)?;
+        storage.roles.delete(role_id)?;
+    }
+    Ok(())
+}
+
 /// Persist a privilege definition in the internal clusterwide storage.
 pub fn acl_global_grant_privilege(
     storage: &Clusterwide,
@@ -2265,7 +2440,7 @@ pub fn acl_global_revoke_privilege(
 ) -> tarantool::Result<()> {
     // FIXME: currently there's no way to revoke a default privilege
     storage.privileges.delete(
-        priv_def.user_id,
+        priv_def.grantee_id,
         &priv_def.object_type,
         &priv_def.object_name,
         &priv_def.privilege,
@@ -2340,13 +2515,49 @@ pub fn acl_drop_user_on_master(user_id: UserId) -> tarantool::Result<()> {
     Ok(())
 }
 
+/// Create a tarantool role.
+pub fn acl_create_role_on_master(role_def: &RoleDef) -> tarantool::Result<()> {
+    let sys_user = Space::from(SystemSpace::User);
+
+    // This impelemtation was copied from box.schema.role.create.
+
+    // Tarantool expects auth info to be a map `{}`, and currently the simplest
+    // way to achieve this is to use a HashMap.
+    sys_user.insert(&(
+        role_def.id,
+        ::tarantool::session::euid()?,
+        &role_def.name,
+        "role",
+        HashMap::<(), ()>::new(),
+        &[(); 0],
+        0,
+    ))?;
+
+    Ok(())
+}
+
+/// Drop a tarantool role and revoke it from anybody it was assigned to.
+pub fn acl_drop_role_on_master(role_id: UserId) -> tarantool::Result<()> {
+    let lua = ::tarantool::lua_state();
+    lua.exec_with("box.schema.role.drop(...)", role_id)
+        .map_err(LuaError::from)?;
+
+    Ok(())
+}
+
 /// Grant a tarantool user some privilege defined by `priv_def`.
 pub fn acl_grant_privilege_on_master(priv_def: &PrivilegeDef) -> tarantool::Result<()> {
     let lua = ::tarantool::lua_state();
     lua.exec_with(
-        "box.schema.user.grant(...)",
+        "local grantee_id, privilege, object_type, object_name = ...
+        local grantee_def = box.space._user:get(grantee_id)
+        if grantee_def.type == 'user' then
+            box.schema.user.grant(grantee_id, privilege, object_type, object_name)
+        else
+            box.schema.role.grant(grantee_id, privilege, object_type, object_name)
+        end",
         (
-            priv_def.user_id,
+            priv_def.grantee_id,
             &priv_def.privilege,
             &priv_def.object_type,
             &priv_def.object_name,
@@ -2361,9 +2572,15 @@ pub fn acl_grant_privilege_on_master(priv_def: &PrivilegeDef) -> tarantool::Resu
 pub fn acl_revoke_privilege_on_master(priv_def: &PrivilegeDef) -> tarantool::Result<()> {
     let lua = ::tarantool::lua_state();
     lua.exec_with(
-        "box.schema.user.revoke(...)",
+        "local grantee_id, privilege, object_type, object_name = ...
+        local grantee_def = box.space._user:get(grantee_id)
+        if grantee_def.type == 'user' then
+            box.schema.user.revoke(grantee_id, privilege, object_type, object_name)
+        else
+            box.schema.role.revoke(grantee_id, privilege, object_type, object_name)
+        end",
         (
-            priv_def.user_id,
+            priv_def.grantee_id,
             &priv_def.privilege,
             &priv_def.object_type,
             &priv_def.object_name,
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 4bc15438a265dbe70e658b21ec6785a620247c72..e392826fff3c19f6d2ef1b5db5c6beae53e84cf4 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -17,11 +17,12 @@ use crate::storage::ddl_meta_drop_space;
 use crate::storage::SnapshotData;
 use crate::storage::ToEntryIter as _;
 use crate::storage::{
-    acl_change_user_auth_on_master, acl_create_user_on_master, acl_drop_user_on_master,
+    acl_change_user_auth_on_master, acl_create_role_on_master, acl_create_user_on_master,
+    acl_drop_role_on_master, acl_drop_user_on_master, acl_global_change_user_auth,
+    acl_global_create_role, acl_global_create_user, acl_global_drop_role, acl_global_drop_user,
+    acl_global_grant_privilege, acl_global_revoke_privilege, acl_grant_privilege_on_master,
+    acl_revoke_privilege_on_master,
 };
-use crate::storage::{acl_global_change_user_auth, acl_global_create_user, acl_global_drop_user};
-use crate::storage::{acl_global_grant_privilege, acl_global_revoke_privilege};
-use crate::storage::{acl_grant_privilege_on_master, acl_revoke_privilege_on_master};
 use crate::storage::{ddl_abort_on_master, ddl_meta_space_update_operable};
 use crate::storage::{local_schema_version, set_local_schema_version};
 use crate::storage::{Clusterwide, ClusterwideSpaceId, PropertyName};
@@ -997,6 +998,14 @@ impl NodeImpl {
                                 acl_drop_user_on_master(*user_id)
                                     .expect("droping user shouldn't fail");
                             }
+                            Acl::CreateRole { role_def } => {
+                                acl_create_role_on_master(role_def)
+                                    .expect("creating role shouldn't fail");
+                            }
+                            Acl::DropRole { role_id, .. } => {
+                                acl_drop_role_on_master(*role_id)
+                                    .expect("droping role shouldn't fail");
+                            }
                             Acl::GrantPrivilege { priv_def } => {
                                 acl_grant_privilege_on_master(priv_def)
                                     .expect("granting a privilege shouldn't fail");
@@ -1023,6 +1032,14 @@ impl NodeImpl {
                         acl_global_drop_user(&self.storage, *user_id)
                             .expect("droping a user definition shouldn't fail");
                     }
+                    Acl::CreateRole { role_def } => {
+                        acl_global_create_role(&self.storage, role_def)
+                            .expect("persisting a role definition shouldn't fail");
+                    }
+                    Acl::DropRole { role_id, .. } => {
+                        acl_global_drop_role(&self.storage, *role_id)
+                            .expect("droping a role definition shouldn't fail");
+                    }
                     Acl::GrantPrivilege { priv_def } => {
                         acl_global_grant_privilege(&self.storage, priv_def)
                             .expect("persiting a privilege definition shouldn't fail");
diff --git a/src/traft/op.rs b/src/traft/op.rs
index 1220816081579b5a79dd4bc7eedba2e7a7e6691a..f8672cb0bfd705f92661912170855f591509a398 100644
--- a/src/traft/op.rs
+++ b/src/traft/op.rs
@@ -1,4 +1,4 @@
-use crate::schema::{AuthDef, Distribution, PrivilegeDef, UserDef, UserId};
+use crate::schema::{AuthDef, Distribution, PrivilegeDef, RoleDef, UserDef, UserId};
 use crate::storage::space_by_name;
 use crate::storage::Clusterwide;
 use ::tarantool::index::{IndexId, Part};
@@ -133,27 +133,42 @@ impl std::fmt::Display for Op {
             }) => {
                 write!(f, "DropUser({schema_version}, {user_id})")
             }
+            Self::Acl(Acl::CreateRole { role_def }) => {
+                let RoleDef {
+                    id,
+                    name,
+                    schema_version,
+                    ..
+                } = role_def;
+                write!(f, r#"CreateRole({schema_version}, {id}, "{name}")"#,)
+            }
+            Self::Acl(Acl::DropRole {
+                role_id,
+                schema_version,
+            }) => {
+                write!(f, "DropRole({schema_version}, {role_id})")
+            }
             Self::Acl(Acl::GrantPrivilege { priv_def }) => {
                 let PrivilegeDef {
-                    user_id,
+                    grantee_id,
                     object_type,
                     object_name,
                     privilege,
                     schema_version,
                     ..
                 } = priv_def;
-                write!(f, "GrantPrivilege({schema_version}, {user_id}, {object_type}, {object_name}, {privilege})")
+                write!(f, "GrantPrivilege({schema_version}, {grantee_id}, {object_type}, {object_name}, {privilege})")
             }
             Self::Acl(Acl::RevokePrivilege { priv_def }) => {
                 let PrivilegeDef {
-                    user_id,
+                    grantee_id,
                     object_type,
                     object_name,
                     privilege,
                     schema_version,
                     ..
                 } = priv_def;
-                write!(f, "RevokePrivilege({schema_version}, {user_id}, {object_type}, {object_name}, {privilege})")
+                write!(f, "RevokePrivilege({schema_version}, {grantee_id}, {object_type}, {object_name}, {privilege})")
             }
         };
 
@@ -488,10 +503,19 @@ pub enum Acl {
         schema_version: u64,
     },
 
-    /// Grant a user some privilege.
+    /// Create a tarantool role. Grant it default privileges.
+    CreateRole { role_def: RoleDef },
+
+    /// Drop a tarantool role and revoke it from any grantees.
+    DropRole {
+        role_id: UserId,
+        schema_version: u64,
+    },
+
+    /// Grant some privilege to a user or a role.
     GrantPrivilege { priv_def: PrivilegeDef },
 
-    /// Revoke a user some privilege.
+    /// Revoke some privilege from a user or a role.
     RevokePrivilege { priv_def: PrivilegeDef },
 }
 
@@ -501,6 +525,8 @@ impl Acl {
             Self::CreateUser { user_def } => user_def.schema_version,
             Self::ChangeAuth { schema_version, .. } => *schema_version,
             Self::DropUser { schema_version, .. } => *schema_version,
+            Self::CreateRole { role_def, .. } => role_def.schema_version,
+            Self::DropRole { schema_version, .. } => *schema_version,
             Self::GrantPrivilege { priv_def } => priv_def.schema_version,
             Self::RevokePrivilege { priv_def } => priv_def.schema_version,
         }
@@ -511,6 +537,8 @@ impl Acl {
             Self::CreateUser { user_def } => user_def.schema_version = new_schema_version,
             Self::ChangeAuth { schema_version, .. } => *schema_version = new_schema_version,
             Self::DropUser { schema_version, .. } => *schema_version = new_schema_version,
+            Self::CreateRole { role_def, .. } => role_def.schema_version = new_schema_version,
+            Self::DropRole { schema_version, .. } => *schema_version = new_schema_version,
             Self::GrantPrivilege { priv_def } => priv_def.schema_version = new_schema_version,
             Self::RevokePrivilege { priv_def } => priv_def.schema_version = new_schema_version,
         }
diff --git a/test/int/test_acl.py b/test/int/test_acl.py
index e53287f362f9950c5e4b8aa77446f44d1359b39c..7a6c56314ac039ce04fcb6bd81615a2f1d4d97c6 100644
--- a/test/int/test_acl.py
+++ b/test/int/test_acl.py
@@ -166,4 +166,73 @@ def test_acl_basic(cluster: Cluster):
             i.eval("return 1", user=user, password=new_password)
 
 
+def test_acl_roles_basic(cluster: Cluster):
+    i1, *_ = cluster.deploy(instance_count=4, init_replication_factor=2)
+
+    user = "Steven"
+    password = "1234"
+
+    # Create user.
+    index = i1.call("pico.create_user", user, password)
+    cluster.raft_wait_index(index)
+
+    # Doing anything via remote function execution requires execute access
+    # to the "universe"
+    index = i1.call("pico.grant_privilege", user, "execute", "universe")
+    cluster.raft_wait_index(index)
+
+    # Try reading from space on behalf of the user.
+    for i in cluster.instances:
+        with pytest.raises(
+            TarantoolError,
+            match="Read access to space '_pico_property' is denied for user 'Steven'",
+        ):
+            i.call("box.space._pico_property:select", user=user, password=password)
+
+    #
+    #
+    # Create role.
+    role = "PropertyReader"
+    index = i1.call("pico.create_role", role)
+    cluster.raft_wait_index(index)
+
+    # Grant the role read access.
+    index = i1.call("pico.grant_privilege", role, "read", "space", "_pico_property")
+    cluster.raft_wait_index(index)
+
+    # Assign role to user.
+    index = i1.call("pico.grant_privilege", user, "execute", "role", role)
+    cluster.raft_wait_index(index)
+
+    # Try reading from space on behalf of the user again. Now succeed.
+    for i in cluster.instances:
+        rows = i.call("box.space._pico_property:select", user=user, password=password)
+        assert len(rows) > 0
+
+    # Revoke read access from the role.
+    index = i1.call("pico.revoke_privilege", role, "read", "space", "_pico_property")
+    cluster.raft_wait_index(index)
+
+    # Try reading from space on behalf of the user yet again, which fails again.
+    for i in cluster.instances:
+        with pytest.raises(
+            TarantoolError,
+            match="Read access to space '_pico_property' is denied for user 'Steven'",
+        ):
+            i.call("box.space._pico_property:select", user=user, password=password)
+
+    # Drop the role.
+    index = i1.call("pico.drop_role", role)
+    cluster.raft_wait_index(index)
+
+    # Nothing changed here.
+    for i in cluster.instances:
+        with pytest.raises(
+            TarantoolError,
+            match="Read access to space '_pico_property' is denied for user 'Steven'",
+        ):
+            i.call("box.space._pico_property:select", user=user, password=password)
+
+
+# TODO: test acl via snapshot
 # TODO: test acl get denied when there's an unfinished ddl