From 0845aa03396b326cd9286dd061374607288bb378 Mon Sep 17 00:00:00 2001
From: Kurdakov Alexander <kusancho12@gmail.com>
Date: Thu, 7 Dec 2023 01:43:27 +0300
Subject: [PATCH] feat: check for circular role grants

---
 src/access_control.rs | 268 +++++++++++++++++++++++++++++++++++++-----
 src/cas.rs            |   2 +-
 src/storage.rs        |  40 +++----
 test/int/test_acl.py  |  63 ++++++++++
 4 files changed, 320 insertions(+), 53 deletions(-)

diff --git a/src/access_control.rs b/src/access_control.rs
index b009a680f1..78bd63083d 100644
--- a/src/access_control.rs
+++ b/src/access_control.rs
@@ -26,7 +26,11 @@
 //! tarantool perspective so we bypass vanilla checks by using `box.session.su(ADMIN)`. Then on the raft leader box.session.su
 //! is used again to switch to user provided in the request. This is needed because tarantool functions we use for access checks
 //! make them based on effective user.
-use std::{borrow::Cow, collections::HashMap, fmt::Display};
+use std::{
+    borrow::Cow,
+    collections::{HashMap, HashSet},
+    fmt::Display,
+};
 
 use tarantool::{
     access_control::{
@@ -40,7 +44,7 @@ use tarantool::{
 
 use crate::{
     schema::{PrivilegeDef, PrivilegeType, SchemaObjectType as PicoSchemaObjectType},
-    storage::space_by_id,
+    storage::{space_by_id, Clusterwide, ToEntryIter},
     traft::op::{self, Op},
     ADMIN_USER_ID,
 };
@@ -207,7 +211,73 @@ fn access_check_ddl(ddl: &op::Ddl, as_user: UserId) -> tarantool::Result<()> {
     }
 }
 
+/// This is port of vanilla tarantool function box/user.cc::role_check.
+fn detect_role_grant_cycles(
+    granted_role: &UserMetadata,
+    priv_def: &PrivilegeDef,
+    storage: &Clusterwide,
+) -> tarantool::Result<()> {
+    let grantee_id = priv_def.grantee_id();
+    let grantee_name = {
+        let grantee = user_by_id(grantee_id)?;
+        let grantee_meta: UserMetadata = grantee.decode()?;
+        grantee_meta.name
+    };
+
+    let role_id_to_grantees_id = {
+        let mut hm = HashMap::new();
+        for privilege in storage.privileges.iter()? {
+            if privilege.object_type() == PicoSchemaObjectType::Role
+                && privilege.privilege() == PrivilegeType::Execute
+            {
+                match privilege.object_id() {
+                    Some(object_id) => {
+                        hm.entry(object_id as i64)
+                            .or_insert(Vec::new())
+                            .push(privilege.grantee_id() as i64);
+                    }
+                    None => continue,
+                }
+            }
+        }
+
+        hm
+    };
+
+    let mut visited = HashSet::new();
+    visited.insert(grantee_id as i64);
+
+    let mut current_layer = visited.clone();
+    // It's BFS with start from grantee_id
+    while !current_layer.is_empty() {
+        let mut next_layer = HashSet::new();
+
+        for role_id in current_layer.iter() {
+            let parents = role_id_to_grantees_id.get(role_id);
+            if let Some(parents) = parents {
+                next_layer.extend(parents.iter());
+            }
+        }
+
+        visited.extend(next_layer.iter());
+        current_layer = next_layer;
+    }
+
+    if visited.contains(&(granted_role.id as i64)) {
+        let err = tarantool::set_and_get_error!(
+            tarantool::error::TarantoolErrorCode::RoleLoop,
+            "Granting role {} to role {} would create a loop",
+            granted_role.name,
+            grantee_name
+        );
+        return Err(err.into());
+    }
+
+    Ok(())
+}
+
 fn access_check_grant_revoke(
+    storage: &Clusterwide,
     priv_def: &PrivilegeDef,
     grantor_id: UserId,
     access: PrivType,
@@ -274,33 +344,32 @@ fn access_check_grant_revoke(
             );
         }
         PicoSchemaObjectType::Role => {
-            let sys_role = user_by_id(object_id)?;
-            let sys_role_meta: UserMetadata = sys_role.decode()?;
+            let granted_role = user_by_id(object_id)?;
+            let granted_role_meta: UserMetadata = granted_role.decode()?;
 
-            assert_eq!(object_id, sys_role_meta.id, "user metadata id mismatch");
+            assert_eq!(object_id, granted_role_meta.id, "user metadata id mismatch");
 
             // Only the creator of the role or admin can grant or revoke it.
             // Everyone can grant 'PUBLIC' role.
             // Note that having a role means having execute privilege on it.
-            if sys_role_meta.owner_id != grantor_id
+            if granted_role_meta.owner_id != grantor_id
                 && grantor_id != ADMIN_USER_ID
-                && !(sys_role_meta.name == "public"
+                && !(granted_role_meta.name == "public"
                     && priv_def.privilege() == PrivilegeType::Execute)
             {
                 return Err(make_access_denied(
                     access_name,
                     PicoSchemaObjectType::Role,
-                    sys_role_meta.name,
+                    granted_role_meta.name,
                     grantor.name,
                 ));
             }
 
-            // TODO Check circular grants for roles
-            // see https://git.picodata.io/picodata/picodata/picodata/-/issues/415
+            detect_role_grant_cycles(&granted_role_meta, priv_def, storage)?;
 
             return box_access_check_ddl_as_user(
-                &sys_role_meta.name,
-                sys_role_meta.id,
+                &granted_role_meta.name,
+                granted_role_meta.id,
                 grantor_id,
                 TntSchemaObjectType::Role,
                 access,
@@ -338,7 +407,11 @@ fn access_check_grant_revoke(
     Ok(())
 }
 
-fn access_check_acl(acl: &op::Acl, as_user: UserId) -> tarantool::Result<()> {
+fn access_check_acl(
+    storage: &Clusterwide,
+    acl: &op::Acl,
+    as_user: UserId,
+) -> tarantool::Result<()> {
     match acl {
         op::Acl::CreateUser { user_def } => {
             assert_eq!(
@@ -416,15 +489,19 @@ fn access_check_acl(acl: &op::Acl, as_user: UserId) -> tarantool::Result<()> {
             )
         }
         op::Acl::GrantPrivilege { priv_def } => {
-            access_check_grant_revoke(priv_def, as_user, PrivType::Grant, "Grant")
+            access_check_grant_revoke(storage, priv_def, as_user, PrivType::Grant, "Grant")
         }
         op::Acl::RevokePrivilege { priv_def } => {
-            access_check_grant_revoke(priv_def, as_user, PrivType::Revoke, "Revoke")
+            access_check_grant_revoke(storage, priv_def, as_user, PrivType::Revoke, "Revoke")
         }
     }
 }
 
-pub(super) fn access_check_op(op: &op::Op, as_user: UserId) -> tarantool::Result<()> {
+pub(super) fn access_check_op(
+    storage: &Clusterwide,
+    op: &op::Op,
+    as_user: UserId,
+) -> tarantool::Result<()> {
     match op {
         Op::Nop => Ok(()),
         Op::Dml(dml) => access_check_dml(dml, as_user),
@@ -442,7 +519,7 @@ pub(super) fn access_check_op(op: &op::Op, as_user: UserId) -> tarantool::Result
             }
             Ok(())
         }
-        Op::Acl(acl) => access_check_acl(acl, as_user),
+        Op::Acl(acl) => access_check_acl(storage, acl, as_user),
     }
 }
 
@@ -455,9 +532,12 @@ mod tests {
         schema::{
             Distribution, PrivilegeDef, PrivilegeType, RoleDef, SchemaObjectType, UserDef, ADMIN_ID,
         },
-        storage::acl::{
-            on_master_create_role, on_master_create_user, on_master_grant_privilege,
-            on_master_revoke_privilege,
+        storage::{
+            acl::{
+                global_create_role, global_grant_privilege, on_master_create_role,
+                on_master_create_user, on_master_grant_privilege, on_master_revoke_privilege,
+            },
+            Clusterwide,
         },
         traft::op::{Acl, Ddl, Dml, Op},
         ADMIN_USER_ID,
@@ -519,6 +599,7 @@ mod tests {
 
     #[track_caller]
     fn grant(
+        storage: &Clusterwide,
         privilege: PrivilegeType,
         object_type: SchemaObjectType,
         object_id: i64,
@@ -536,6 +617,7 @@ mod tests {
         .expect("must be valid");
 
         access_check_op(
+            storage,
             &Op::Acl(Acl::GrantPrivilege {
                 priv_def: priv_def.clone(),
             }),
@@ -543,11 +625,12 @@ mod tests {
         )
         .unwrap();
 
-        on_master_grant_privilege(&priv_def).unwrap()
+        on_master_grant_privilege(&priv_def).unwrap();
     }
 
     #[track_caller]
     fn revoke(
+        storage: &Clusterwide,
         grantee_id: UserId,
         privilege: PrivilegeType,
         object_type: SchemaObjectType,
@@ -564,6 +647,7 @@ mod tests {
         .expect("must be valid");
 
         access_check_op(
+            storage,
             &Op::Acl(Acl::RevokePrivilege {
                 priv_def: priv_def.clone(),
             }),
@@ -578,6 +662,7 @@ mod tests {
         let user_name = "box_access_check_space_test_user";
 
         let user_id = make_user(user_name, None);
+        let storage = Clusterwide::for_tests();
 
         // space
         let space_name = "test_box_access_check_ddl";
@@ -603,6 +688,7 @@ mod tests {
             );
 
             grant(
+                &storage,
                 PrivilegeType::Create,
                 SchemaObjectType::Table,
                 -1,
@@ -623,6 +709,7 @@ mod tests {
             );
 
             grant(
+                &storage,
                 PrivilegeType::Drop,
                 SchemaObjectType::Table,
                 -1,
@@ -633,7 +720,13 @@ mod tests {
             access_check_ddl(&Ddl::DropTable { id: space.id() }, user_id).unwrap();
         }
 
-        revoke(user_id, PrivilegeType::Drop, SchemaObjectType::Table, -1);
+        revoke(
+            &storage,
+            user_id,
+            PrivilegeType::Drop,
+            SchemaObjectType::Table,
+            -1,
+        );
 
         // drop on particular entity works
         {
@@ -645,6 +738,7 @@ mod tests {
             );
 
             grant(
+                &storage,
                 PrivilegeType::Drop,
                 SchemaObjectType::Table,
                 space.id() as i64,
@@ -683,10 +777,10 @@ mod tests {
                 (PrivilegeType::Write, "Write", write_op),
             ] {
                 // owner himself has permission on an object
-                access_check_op(&op, user_id).unwrap();
+                access_check_op(&storage, &op, user_id).unwrap();
 
                 // run access check for another user, it fails without grant
-                let e = access_check_op(&op, grantee_user_id).unwrap_err();
+                let e = access_check_op(&storage, &op, grantee_user_id).unwrap_err();
 
                 assert_eq!(
                     e.to_string(),
@@ -695,6 +789,7 @@ mod tests {
 
                 // grant permission on behalf of the user owning the space
                 grant(
+                    &storage,
                     privilege,
                     SchemaObjectType::Table,
                     space_grant.id() as _,
@@ -703,7 +798,7 @@ mod tests {
                 );
 
                 // access check should succeed
-                access_check_op(&op, grantee_user_id).unwrap();
+                access_check_op(&storage, &op, grantee_user_id).unwrap();
             }
         }
     }
@@ -714,12 +809,15 @@ mod tests {
         let actor_user_name = "box_access_check_ddl_test_user_actor";
         let user_under_test_name = "box_access_check_ddl_test_user";
 
+        let storage = Clusterwide::for_tests();
+
         let actor_user_id = make_user(actor_user_name, None);
         let user_under_test_id = make_user(user_under_test_name, None);
 
         // create works with passed id
         {
             let e = access_check_acl(
+                &storage,
                 &Acl::CreateUser {
                     user_def: dummy_user_def(
                         123,
@@ -737,6 +835,7 @@ mod tests {
             );
 
             grant(
+                &storage,
                 PrivilegeType::Create,
                 SchemaObjectType::User,
                 -1,
@@ -745,6 +844,7 @@ mod tests {
             );
 
             access_check_acl(
+                &storage,
                 &Acl::CreateUser {
                     user_def: dummy_user_def(
                         123,
@@ -760,6 +860,7 @@ mod tests {
         // drop can be granted with wildcard, check on particular entity works
         {
             let e = access_check_acl(
+                &storage,
                 &Acl::DropUser {
                     user_id: user_under_test_id,
                     schema_version: 0,
@@ -774,6 +875,7 @@ mod tests {
             );
 
             grant(
+                &storage,
                 PrivilegeType::Drop,
                 SchemaObjectType::User,
                 -1,
@@ -782,6 +884,7 @@ mod tests {
             );
 
             access_check_acl(
+                &storage,
                 &Acl::DropUser {
                     user_id: user_under_test_id,
                     schema_version: 0,
@@ -794,6 +897,7 @@ mod tests {
         // alter can be granted with wildcard, check on particular entity works
         {
             let e = access_check_acl(
+                &storage,
                 &Acl::ChangeAuth {
                     user_id: user_under_test_id,
                     auth: dummy_auth_def(),
@@ -809,6 +913,7 @@ mod tests {
             );
 
             grant(
+                &storage,
                 PrivilegeType::Alter,
                 SchemaObjectType::User,
                 -1,
@@ -817,6 +922,7 @@ mod tests {
             );
 
             access_check_acl(
+                &storage,
                 &Acl::ChangeAuth {
                     user_id: user_under_test_id,
                     auth: dummy_auth_def(),
@@ -828,12 +934,14 @@ mod tests {
         }
 
         revoke(
+            &storage,
             actor_user_id,
             PrivilegeType::Drop,
             SchemaObjectType::User,
             -1,
         );
         revoke(
+            &storage,
             actor_user_id,
             PrivilegeType::Alter,
             SchemaObjectType::User,
@@ -843,6 +951,7 @@ mod tests {
         // drop on particular entity works
         {
             let e = access_check_acl(
+                &storage,
                 &Acl::DropUser {
                     user_id: user_under_test_id,
                     schema_version: 0,
@@ -857,6 +966,7 @@ mod tests {
             );
 
             grant(
+                &storage,
                 PrivilegeType::Drop,
                 SchemaObjectType::User,
                 user_under_test_id as i64,
@@ -865,6 +975,7 @@ mod tests {
             );
 
             access_check_acl(
+                &storage,
                 &Acl::DropUser {
                     user_id: user_under_test_id,
                     schema_version: 0,
@@ -877,6 +988,7 @@ mod tests {
         // alter on particular entity works
         {
             let e = access_check_acl(
+                &storage,
                 &Acl::ChangeAuth {
                     user_id: user_under_test_id,
                     auth: dummy_auth_def(),
@@ -892,6 +1004,7 @@ mod tests {
             );
 
             grant(
+                &storage,
                 PrivilegeType::Alter,
                 SchemaObjectType::User,
                 user_under_test_id as i64,
@@ -900,6 +1013,7 @@ mod tests {
             );
 
             access_check_acl(
+                &storage,
                 &Acl::ChangeAuth {
                     user_id: user_under_test_id,
                     auth: dummy_auth_def(),
@@ -934,10 +1048,10 @@ mod tests {
                 (PrivilegeType::Alter, "Alter", alter_op),
             ] {
                 // owner himself has permission on an object
-                access_check_op(&op, actor_user_id).unwrap();
+                access_check_op(&storage, &op, actor_user_id).unwrap();
 
                 // run access check for another user, it fails without grant
-                let e = access_check_op(&op, grantee_user_id).unwrap_err();
+                let e = access_check_op(&storage, &op, grantee_user_id).unwrap_err();
 
                 assert_eq!(
                     e.to_string(),
@@ -946,6 +1060,7 @@ mod tests {
 
                 // grant permission on behalf of the user owning the user
                 grant(
+                    &storage,
                     privilege,
                     SchemaObjectType::User,
                     user_id_grant as _,
@@ -954,7 +1069,7 @@ mod tests {
                 );
 
                 // access check should succeed
-                access_check_op(&op, grantee_user_id).unwrap();
+                access_check_op(&storage, &op, grantee_user_id).unwrap();
             }
         }
     }
@@ -964,6 +1079,7 @@ mod tests {
         let user_name = "box_access_check_ddl_test_role";
 
         let user_id = make_user(user_name, None);
+        let storage = Clusterwide::for_tests();
 
         let role_name = "box_access_check_ddl_test_role_some_role";
         let role_def = RoleDef {
@@ -984,6 +1100,7 @@ mod tests {
             };
 
             let e = access_check_acl(
+                &storage,
                 &Acl::CreateRole {
                     role_def: role_to_be_created.clone(),
                 },
@@ -997,6 +1114,7 @@ mod tests {
             );
 
             grant(
+                &storage,
                 PrivilegeType::Create,
                 SchemaObjectType::Role,
                 -1,
@@ -1005,6 +1123,7 @@ mod tests {
             );
 
             access_check_acl(
+                &storage,
                 &Acl::CreateRole {
                     role_def: role_to_be_created,
                 },
@@ -1016,6 +1135,7 @@ mod tests {
         // drop can be granted with wildcard, check on particular entity works
         {
             let e = access_check_acl(
+                &storage,
                 &Acl::DropRole {
                     role_id: role_def.id,
                     schema_version: 0,
@@ -1030,6 +1150,7 @@ mod tests {
             );
 
             grant(
+                &storage,
                 PrivilegeType::Drop,
                 SchemaObjectType::Role,
                 -1,
@@ -1038,6 +1159,7 @@ mod tests {
             );
 
             access_check_acl(
+                &storage,
                 &Acl::DropRole {
                     role_id: role_def.id,
                     schema_version: 0,
@@ -1047,11 +1169,18 @@ mod tests {
             .unwrap();
         }
 
-        revoke(user_id, PrivilegeType::Drop, SchemaObjectType::Role, -1);
+        revoke(
+            &storage,
+            user_id,
+            PrivilegeType::Drop,
+            SchemaObjectType::Role,
+            -1,
+        );
 
         // drop on particular entity works
         {
             let e = access_check_acl(
+                &storage,
                 &Acl::DropRole {
                     role_id: role_def.id,
                     schema_version: 0,
@@ -1066,6 +1195,7 @@ mod tests {
             );
 
             grant(
+                &storage,
                 PrivilegeType::Drop,
                 SchemaObjectType::Role,
                 role_def.id as i64,
@@ -1074,6 +1204,7 @@ mod tests {
             );
 
             access_check_acl(
+                &storage,
                 &Acl::DropRole {
                     role_id: role_def.id,
                     schema_version: 0,
@@ -1105,10 +1236,10 @@ mod tests {
             });
 
             // owner himself has permission on an object
-            access_check_op(&op, user_id).unwrap();
+            access_check_op(&storage, &op, user_id).unwrap();
 
             // run access check for another user, it fails without grant
-            let e = access_check_op(&op, grantee_user_id).unwrap_err();
+            let e = access_check_op(&storage, &op, grantee_user_id).unwrap_err();
 
             assert_eq!(
                     e.to_string(),
@@ -1117,6 +1248,7 @@ mod tests {
 
             // grant permission on behalf of the user owning the role
             grant(
+                &storage,
                 PrivilegeType::Drop,
                 SchemaObjectType::Role,
                 role_id_grant as _,
@@ -1125,7 +1257,79 @@ mod tests {
             );
 
             // access check should succeed
-            access_check_op(&op, grantee_user_id).unwrap();
+            access_check_op(&storage, &op, grantee_user_id).unwrap();
+        }
+    }
+
+    #[tarantool::test]
+    fn prohibit_circular_role_grant() {
+        let storage = Clusterwide::for_tests();
+
+        let create_role = |name| {
+            let id = next_user_id();
+            let role_def = RoleDef {
+                id,
+                name: String::from(name),
+                schema_version: 0,
+                owner: ADMIN_ID,
+            };
+
+            on_master_create_role(&role_def).expect("create role shouldn't fail");
+            global_create_role(&storage, &role_def).expect("create role shouldn't fail");
+            id
+        };
+
+        let parent_id = create_role("Parent");
+        let child_id = create_role("Child");
+
+        // circular grant: child to parent, parent to child should throw error
+        {
+            grant(
+                &storage,
+                PrivilegeType::Execute,
+                SchemaObjectType::Role,
+                child_id as i64,
+                parent_id,
+                None,
+            );
+
+            // grant child to parent
+            let privilege = PrivilegeDef::new(
+                PrivilegeType::Execute,
+                SchemaObjectType::Role,
+                child_id as i64,
+                parent_id,
+                ADMIN_ID,
+                0,
+            )
+            .unwrap();
+
+            global_grant_privilege(&storage, &privilege).unwrap();
+
+            // grant parent to child
+            let privilege = PrivilegeDef::new(
+                PrivilegeType::Execute,
+                SchemaObjectType::Role,
+                parent_id as i64,
+                child_id,
+                ADMIN_ID,
+                0,
+            )
+            .unwrap();
+
+            let e = access_check_acl(
+                &storage,
+                &Acl::GrantPrivilege {
+                    priv_def: privilege,
+                },
+                ADMIN_ID,
+            )
+            .unwrap_err();
+
+            assert_eq!(
+                e.to_string(),
+                format!("tarantool error: RoleLoop: Granting role Parent to role Child would create a loop"),
+            );
         }
     }
 }
diff --git a/src/cas.rs b/src/cas.rs
index 2fa33bf723..5d658df493 100644
--- a/src/cas.rs
+++ b/src/cas.rs
@@ -268,7 +268,7 @@ fn proc_cas_local(req: Request) -> Result<Response> {
 
     // Note: audit log record is automatically emmitted in case there is an error,
     // because it is hooked into AccessDenied error creation (on_access_denied) trigger
-    access_control::access_check_op(&req.op, req.as_user)?;
+    access_control::access_check_op(storage, &req.op, req.as_user)?;
 
     if let Op::Dml(dml) = &req.op {
         // Check if the requested dml is applicable to the local storage.
diff --git a/src/storage.rs b/src/storage.rs
index 823d43fca2..bb01fa205b 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -1420,8 +1420,8 @@ impl ToEntryIter for Replicasets {
     type Entry = Replicaset;
 
     #[inline(always)]
-    fn index_iter(&self) -> Result<IndexIterator> {
-        Ok(self.space.select(IteratorType::All, &())?)
+    fn index_iter(&self) -> tarantool::Result<IndexIterator> {
+        self.space.select(IteratorType::All, &())
     }
 }
 
@@ -1489,8 +1489,8 @@ impl ToEntryIter for PeerAddresses {
     type Entry = traft::PeerAddress;
 
     #[inline(always)]
-    fn index_iter(&self) -> Result<IndexIterator> {
-        Ok(self.space.select(IteratorType::All, &())?)
+    fn index_iter(&self) -> tarantool::Result<IndexIterator> {
+        self.space.select(IteratorType::All, &())
     }
 }
 
@@ -1630,8 +1630,8 @@ impl ToEntryIter for Instances {
     type Entry = Instance;
 
     #[inline(always)]
-    fn index_iter(&self) -> Result<IndexIterator> {
-        Ok(self.space.select(IteratorType::All, &())?)
+    fn index_iter(&self) -> tarantool::Result<IndexIterator> {
+        self.space.select(IteratorType::All, &())
     }
 }
 
@@ -1675,10 +1675,10 @@ pub trait ToEntryIter {
     /// Target type for entry deserialization.
     type Entry;
 
-    fn index_iter(&self) -> Result<IndexIterator>;
+    fn index_iter(&self) -> tarantool::Result<IndexIterator>;
 
     #[inline(always)]
-    fn iter(&self) -> Result<EntryIter<Self::Entry>> {
+    fn iter(&self) -> tarantool::Result<EntryIter<Self::Entry>> {
         Ok(EntryIter::new(self.index_iter()?))
     }
 }
@@ -1902,8 +1902,8 @@ impl ToEntryIter for Tables {
     type Entry = TableDef;
 
     #[inline(always)]
-    fn index_iter(&self) -> Result<IndexIterator> {
-        Ok(self.space.select(IteratorType::All, &())?)
+    fn index_iter(&self) -> tarantool::Result<IndexIterator> {
+        self.space.select(IteratorType::All, &())
     }
 }
 
@@ -1996,8 +1996,8 @@ impl ToEntryIter for Indexes {
     type Entry = IndexDef;
 
     #[inline(always)]
-    fn index_iter(&self) -> Result<IndexIterator> {
-        Ok(self.space.select(IteratorType::All, &())?)
+    fn index_iter(&self) -> tarantool::Result<IndexIterator> {
+        self.space.select(IteratorType::All, &())
     }
 }
 
@@ -2281,8 +2281,8 @@ impl ToEntryIter for Users {
     type Entry = UserDef;
 
     #[inline(always)]
-    fn index_iter(&self) -> Result<IndexIterator> {
-        Ok(self.space.select(IteratorType::All, &())?)
+    fn index_iter(&self) -> tarantool::Result<IndexIterator> {
+        self.space.select(IteratorType::All, &())
     }
 }
 
@@ -2363,8 +2363,8 @@ impl ToEntryIter for Roles {
     type Entry = RoleDef;
 
     #[inline(always)]
-    fn index_iter(&self) -> Result<IndexIterator> {
-        Ok(self.space.select(IteratorType::All, &())?)
+    fn index_iter(&self) -> tarantool::Result<IndexIterator> {
+        self.space.select(IteratorType::All, &())
     }
 }
 
@@ -2518,8 +2518,8 @@ impl ToEntryIter for Privileges {
     type Entry = PrivilegeDef;
 
     #[inline(always)]
-    fn index_iter(&self) -> Result<IndexIterator> {
-        Ok(self.space.select(IteratorType::All, &())?)
+    fn index_iter(&self) -> tarantool::Result<IndexIterator> {
+        self.space.select(IteratorType::All, &())
     }
 }
 
@@ -2562,8 +2562,8 @@ impl ToEntryIter for Tiers {
     type Entry = Tier;
 
     #[inline(always)]
-    fn index_iter(&self) -> Result<IndexIterator> {
-        Ok(self.space.select(IteratorType::All, &())?)
+    fn index_iter(&self) -> tarantool::Result<IndexIterator> {
+        self.space.select(IteratorType::All, &())
     }
 }
 
diff --git a/test/int/test_acl.py b/test/int/test_acl.py
index 4279464498..366d0d511b 100644
--- a/test/int/test_acl.py
+++ b/test/int/test_acl.py
@@ -991,5 +991,68 @@ def test_grant_and_revoke_default_users_privileges(cluster: Cluster):
     assert new_index == index
 
 
+# it's part of https://git.picodata.io/picodata/picodata/picodata/-/issues/421
+# https://git.picodata.io/picodata/tarantool/-/blob/82123356764155d1f4b294c0f2b96d919c2a8ec7/test/box/role.test.lua#L34
+def test_circular_grants_for_role(cluster: Cluster):
+    i1, *_ = cluster.deploy(instance_count=1)
+
+    i1.sql("CREATE ROLE Grandfather")
+
+    i1.sql("CREATE ROLE Father")
+
+    i1.sql("CREATE ROLE Son")
+
+    i1.sql("CREATE ROLE Daughter")
+
+    i1.sql("GRANT Father to Grandfather")
+
+    i1.sql("GRANT Son to Father")
+
+    i1.sql("GRANT Daughter to Father")
+
+    # Graph of grants isn't tree
+    i1.sql("GRANT Son to Daughter")
+
+    # At this point graph of grants looks like:
+    #               Grandfather
+    #              /
+    #             /
+    #            /--->   Father
+    #                /           \
+    #               /             \
+    #              /               \
+    #             /                 \
+    #            /                   \
+    #           /                     \
+    #          /->  Son <-- Daugther <-\
+
+    def throws_on_grant(grantee, granted):
+        with pytest.raises(
+            ReturnError,
+            match=f"Granting role {granted.upper()} to role {grantee.upper()} would create a loop",
+        ):
+            i1.sql(f"GRANT {granted} to {grantee}")
+
+    # Following grants creating a loop
+    throws_on_grant("Son", "Grandfather")
+    throws_on_grant("Son", "Father")
+    throws_on_grant("Son", "Daughter")
+    throws_on_grant("Daughter", "Grandfather")
+    throws_on_grant("Daughter", "Father")
+    throws_on_grant("Father", "Grandfather")
+
+    # Giving user nested roles is allowed
+    i1.sql(f"CREATE USER Dave WITH PASSWORD '{VALID_PASSWORD}'")
+
+    i1.sql("GRANT Grandfather to Dave")
+
+    i1.sql("GRANT Father to Dave")
+
+    i1.sql("GRANT Son to Dave")
+
+    # Granting role to itself isn't allowed
+    throws_on_grant("Son", "Son")
+
+
 # TODO: test acl get denied when there's an unfinished ddl
 # TODO: check various retryable cas outcomes when doing schema change requests
-- 
GitLab