From 6db5a2affc8d18637a6ff76a90435d7bc1ab88d4 Mon Sep 17 00:00:00 2001
From: EmirVildanov <reddog201030@gmail.com>
Date: Thu, 26 Oct 2023 13:00:52 +0300
Subject: [PATCH] feat: support sql GRANT/REVOKE privileges

---
 CHANGELOG.md         |   2 +
 src/sql.rs           | 155 +++++++++++-
 src/sql/pgproto.rs   |  10 +-
 src/storage.rs       |  10 +-
 test/int/test_sql.py | 581 ++++++++++++++++++++++++++++++++++---------
 5 files changed, 633 insertions(+), 125 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 10b190c5a6..b471eb8575 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -76,6 +76,8 @@ with the `YY.0M.MICRO` scheme.
 
 - _Clusterwide SQL_ introduces the capability to create users.
 
+- _Clusterwide SQL_ introduces the capability to grant and revoke privileges.
+
 - New clusterwide tables now have ids higher than any of the existing ones,
   instead of taking the first available id as it was previously. If there's
   already a table with the highest possible id, then the "wholes" in the id
diff --git a/src/sql.rs b/src/sql.rs
index f3847feb5f..983d2c42ef 100644
--- a/src/sql.rs
+++ b/src/sql.rs
@@ -1,7 +1,7 @@
 //! Clusterwide SQL query execution.
 
 use crate::schema::{
-    wait_for_ddl_commit, CreateTableParams, DistributionParam, Field, RoleDef, ShardingFn, UserDef,
+    wait_for_ddl_commit, CreateTableParams, DistributionParam, Field, RoleDef, ShardingFn, UserDef, PrivilegeDef
 };
 use crate::sql::pgproto::{
     with_portals, BoxedPortal, Describe, Descriptor, UserDescriptors, PG_PORTALS,
@@ -27,7 +27,7 @@ use sbroad::executor::protocol::{EncodedRequiredData, RequiredData};
 use sbroad::executor::result::ConsumerResult;
 use sbroad::executor::Query;
 use sbroad::frontend::Ast;
-use sbroad::ir::acl::Acl;
+use sbroad::ir::acl::{Acl, GrantRevokeType};
 use sbroad::ir::ddl::Ddl;
 use sbroad::ir::operator::Relational;
 use sbroad::ir::tree::traversal::{PostOrderWithFilter, REL_CAPACITY};
@@ -36,6 +36,7 @@ use sbroad::ir::{Node as IrNode, Plan as IrPlan};
 use sbroad::otm::{query_id, query_span, stat_query_span, OTM_CHAR_LIMIT};
 use serde::Deserialize;
 
+use crate::storage::Clusterwide;
 use ::tarantool::access_control::{box_access_check_space, PrivType};
 use ::tarantool::auth::{AuthData, AuthDef, AuthMethod};
 use ::tarantool::proc;
@@ -45,6 +46,7 @@ use ::tarantool::time::Instant;
 use ::tarantool::tuple::{RawBytes, Tuple};
 use std::collections::HashMap;
 use std::str::FromStr;
+use tarantool::session;
 
 pub mod pgproto;
 pub mod router;
@@ -390,6 +392,28 @@ impl TraftNode {
             .expect("_user rows must contain id column");
         Ok(max_tarantool_user_id + 1)
     }
+
+    /// Check whether passes user/role name is visible for current user .
+    fn check_user_role_available_for_user(&self, user_role_name: &String) -> traft::Result<()> {
+        let user_role = Space::from(SystemSpace::VUser)
+            .index("name")
+            .expect("_vuser should have a name index")
+            .get(&(user_role_name,))
+            .expect("name index selection from _vuser should succeed");
+        if user_role.is_some() {
+            let storage = &self.storage;
+            if storage.roles.by_name(user_role_name)?.is_some() {
+                return Ok(());
+            }
+            if storage.users.by_name(user_role_name)?.is_some() {
+                return Ok(());
+            }
+        }
+        Err(Error::Sbroad(SbroadError::Invalid(
+            Entity::Acl,
+            Some(format!("There is no role with name {user_role_name}")),
+        )))
+    }
 }
 
 fn check_password_min_length(
@@ -417,6 +441,54 @@ fn check_password_min_length(
     Ok(())
 }
 
+/// Get role or user UserId by its name.
+fn get_role_or_user_id(storage: &Clusterwide, grantee_name: &String) -> traft::Result<UserId> {
+    if let Some(grantee_user_def) = storage.users.by_name(grantee_name)? {
+        Ok(grantee_user_def.id)
+    } else if let Some(grantee_role_def) = storage.roles.by_name(grantee_name)? {
+        Ok(grantee_role_def.id)
+    } else {
+        // No existing user or role found.
+        Err(Error::Sbroad(SbroadError::Invalid(
+            Entity::Acl,
+            Some(format!(
+                "Nor user, neither role with name {grantee_name} exists"
+            )),
+        )))
+    }
+}
+
+/// Get ids of grantor and grantee.
+/// Apply common check on user/role/table names for passed `grant_type`.
+fn get_grantor_grantee_ids_with_checks(
+    node: &TraftNode,
+    grant_type: &GrantRevokeType,
+    grantee_name: &String,
+) -> traft::Result<(UserId, UserId)> {
+    let storage = &node.storage;
+    match grant_type {
+        GrantRevokeType::SpecificUser { user_name: user_role_name, .. }
+        | GrantRevokeType::SpecificRole { role_name: user_role_name, .. }
+        | GrantRevokeType::RolePass { role_name: user_role_name } => {
+            node.check_user_role_available_for_user(&user_role_name)?;
+        }
+        GrantRevokeType::SpecificTable { table_name, .. } => {
+            if storage.spaces.by_name(&table_name)?.is_none() {
+                return Err(Error::Sbroad(SbroadError::Invalid(
+                    Entity::Acl,
+                    Some(format!("There is no table with name {table_name}")),
+                )))
+            }
+        }
+        _ => {}
+    }
+
+    let grantor_id = session::uid()?;
+    let grantee_id = get_role_or_user_id(storage, grantee_name)?;
+
+    Ok((grantor_id, grantee_id))
+}
+
 fn reenterable_schema_change_request(
     node: &TraftNode,
     ir_node: IrNode,
@@ -513,6 +585,22 @@ fn reenterable_schema_change_request(
             let auth = AuthDef::new(method, data.into_string());
             Params::AlterUser(name, auth)
         }
+        IrNode::Acl(Acl::GrantPrivilege {
+            grant_type,
+            grantee_name,
+            ..
+        }) => {
+            // Nothing to check
+            Params::GrantPrivilege(grant_type, grantee_name)
+        }
+        IrNode::Acl(Acl::RevokePrivilege {
+            revoke_type,
+            grantee_name,
+            ..
+        }) => {
+            // Nothing to check
+            Params::RevokePrivilege(revoke_type, grantee_name)
+        }
         n => {
             unreachable!("this function should only be called for ddl or acl nodes, not {n:?}")
         }
@@ -626,7 +714,7 @@ fn reenterable_schema_change_request(
                     if entry_type == "user" {
                         return Err(Error::Sbroad(SbroadError::Invalid(
                             Entity::Acl,
-                            Some(format!("Unable to create role {name}. User with the same name already exists"))
+                            Some(format!("Unable to create role {name}. User with the same name already exists")),
                         )));
                     } else {
                         return Ok(ConsumerResult { row_count: 0 });
@@ -652,6 +740,65 @@ fn reenterable_schema_change_request(
                     schema_version: 0,
                 })
             }
+            Params::GrantPrivilege(
+                grant_type,
+                grantee_name
+            ) => {
+                let (grantor_id, grantee_id) = get_grantor_grantee_ids_with_checks(node, grant_type, grantee_name)?;
+                let (object_type, object_name, privilege) = grant_type.get_privelege_def_fields();
+
+                for priv_def in storage.privileges.get(
+                    grantee_id,
+                    object_type.as_str(),
+                    object_name.as_str(),
+                )? {
+                    if priv_def.privilege == privilege {
+                        // Privilege is already granted, no op needed.
+                        return Ok(ConsumerResult { row_count: 0 });
+                    }
+                }
+                Op::Acl(OpAcl::GrantPrivilege {
+                    priv_def: PrivilegeDef {
+                        grantor_id,
+                        grantee_id,
+                        object_type,
+                        object_name,
+                        privilege,
+                        // This will be set right after the match statement.
+                        schema_version: 0,
+                    }
+                })
+            }
+            Params::RevokePrivilege(revoke_type, grantee_name) => {
+                let (grantor_id, grantee_id) = get_grantor_grantee_ids_with_checks(node, revoke_type, grantee_name)?;
+                let (object_type, object_name, privilege) = revoke_type.get_privelege_def_fields();
+
+                let priv_exists = storage
+                    .privileges
+                    .get(grantee_id, object_type.as_str(), object_name.as_str())?
+                    .map(|priv_def| priv_def.privilege)
+                    .any(|existed_privilege| existed_privilege == privilege);
+                if !priv_exists {
+                    return Err(Error::Sbroad(SbroadError::Invalid(
+                        Entity::Acl,
+                        Some(format!(
+                            "Can't revoke {privilege}, because it hasn't been granted yet."
+                        )),
+                    )));
+                }
+
+                Op::Acl(OpAcl::RevokePrivilege {
+                    priv_def: PrivilegeDef {
+                        grantor_id,
+                        grantee_id,
+                        object_type,
+                        object_name,
+                        privilege,
+                        // This will be set right after the match statement.
+                        schema_version: 0,
+                    },
+                })
+            }
         };
         op.set_schema_version(storage.properties.next_schema_version()?);
         let is_ddl_prepare = matches!(op, Op::DdlPrepare { .. });
@@ -701,6 +848,8 @@ fn reenterable_schema_change_request(
         DropUser(String),
         CreateRole(String),
         DropRole(String),
+        GrantPrivilege(GrantRevokeType, String),
+        RevokePrivilege(GrantRevokeType, String),
     }
 }
 
diff --git a/src/sql/pgproto.rs b/src/sql/pgproto.rs
index 26175339c6..6a8b88b6ec 100644
--- a/src/sql/pgproto.rs
+++ b/src/sql/pgproto.rs
@@ -5,7 +5,7 @@ use crate::traft::{self, node};
 use crate::util::effective_user_id;
 use sbroad::errors::{Entity, SbroadError};
 use sbroad::executor::result::MetadataColumn;
-use sbroad::ir::acl::Acl;
+use sbroad::ir::acl::{Acl, GrantRevokeType};
 use sbroad::ir::ddl::Ddl;
 use sbroad::ir::expression::Expression;
 use sbroad::ir::operator::Relational;
@@ -285,6 +285,14 @@ impl TryFrom<&Node> for CommandTag {
                 Acl::DropRole { .. } | Acl::DropUser { .. } => Ok(CommandTag::DropRole),
                 Acl::CreateRole { .. } | Acl::CreateUser { .. } => Ok(CommandTag::CreateRole),
                 Acl::AlterUser { .. } => Ok(CommandTag::AlterRole),
+                Acl::GrantPrivilege { grant_type, .. } => match grant_type {
+                    GrantRevokeType::RolePass { .. } => Ok(CommandTag::GrantRole),
+                    _ => Ok(CommandTag::Grant),
+                },
+                Acl::RevokePrivilege { revoke_type, .. } => match revoke_type {
+                    GrantRevokeType::RolePass { .. } => Ok(CommandTag::RevokeRole),
+                    _ => Ok(CommandTag::Revoke),
+                },
             },
             Node::Ddl(ddl) => match ddl {
                 Ddl::DropTable { .. } => Ok(CommandTag::DropTable),
diff --git a/src/storage.rs b/src/storage.rs
index f05459bbd2..7b46973fbd 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -2386,10 +2386,12 @@ impl Privileges {
         &self,
         grantee_id: UserId,
         object_type: &str,
-        object_id: i64,
-    ) -> tarantool::Result<Option<PrivilegeDef>> {
-        let tuple = self.space.get(&(grantee_id, object_type, object_id))?;
-        tuple.as_ref().map(Tuple::decode).transpose()
+        object_name: &str,
+    ) -> tarantool::Result<EntryIter<PrivilegeDef>> {
+        let iter = self
+            .space
+            .select(IteratorType::Eq, &(grantee_id, object_type, object_name))?;
+        Ok(EntryIter::new(iter))
     }
 
     #[inline(always)]
diff --git a/test/int/test_sql.py b/test/int/test_sql.py
index 15fe200f26..49e1cde7c8 100644
--- a/test/int/test_sql.py
+++ b/test/int/test_sql.py
@@ -9,6 +9,22 @@ from conftest import (
 )
 
 
+# Check that sql query execution does smth.
+def sql_ok(res):
+    assert res["row_count"] == 1
+
+
+# Check that sql query execution does nothing.
+def sql_no(res):
+    assert res["row_count"] == 0
+
+
+# Check that query execution raises with given error and message pattern.
+def raises(error, pattern, func):
+    with pytest.raises(error, match=pattern):
+        func()
+
+
 def test_pico_sql(cluster: Cluster):
     cluster.deploy(instance_count=1)
     i1 = cluster.instances[0]
@@ -669,20 +685,20 @@ def test_sql_acl_password_length(cluster: Cluster):
     password_short = "pwd"
     password_long = "password"
 
-    acl = i1.sql(
-        """
+    sql_ok(
+        i1.sql(
+            """
         create user {username} with password '{password}'
         using md5 option (timeout = 3)
     """.format(
-            username=username, password=password_long
+                username=username, password=password_long
+            )
         )
     )
-    assert acl["row_count"] == 1
-    acl = i1.sql(f"drop user {username}")
-    assert acl["row_count"] == 1
+    sql_ok(i1.sql(f"drop user {username}"))
 
     with pytest.raises(ReturnError, match="password is too short"):
-        acl = i1.sql(
+        i1.sql(
             """
             create user {username} with password '{password}'
             using md5 option (timeout = 3)
@@ -691,20 +707,20 @@ def test_sql_acl_password_length(cluster: Cluster):
             )
         )
 
-    acl = i1.sql(
-        """
+    sql_ok(
+        i1.sql(
+            """
         create user {username} with password '{password}'
         using ldap option (timeout = 3)
     """.format(
-            username=username, password=password_short
+                username=username, password=password_short
+            )
         )
     )
-    assert acl["row_count"] == 1
-    acl = i1.sql(f"drop user {username}")
-    assert acl["row_count"] == 1
+    sql_ok(i1.sql(f"drop user {username}"))
 
 
-def test_sql_acl_user(cluster: Cluster):
+def test_sql_acl_users_roles(cluster: Cluster):
     cluster.deploy(instance_count=2)
     i1, i2 = cluster.instances
 
@@ -723,12 +739,11 @@ def test_sql_acl_user(cluster: Cluster):
         create user "{username}" with password '{password}'
         using md5 option (timeout = 3)
     """
+        )
     )
-    assert acl["row_count"] == 1
 
     # Dropping user that doesn't exist should return 0.
-    acl = i1.sql(f"drop user {upper_username}")
-    assert acl["row_count"] == 0
+    sql_ok(i1.sql(f"drop user {upper_username}"))
 
     # Dropping user that does exist should return 1.
     acl = i1.sql(f'drop user "{username}"')
@@ -737,55 +752,51 @@ def test_sql_acl_user(cluster: Cluster):
 
     # All the usernames below should match the same user.
     # * Upcasted username in double parentheses shouldn't change.
-    acl = i1.sql(
-        f"""
+    sql_ok(
+        i1.sql(
+            f"""
         create user "{upper_username}" password '{password}'
         using chap-sha1
     """
+        )
     )
-    assert acl["row_count"] == 1
     # * Username as is in double parentheses.
-    acl = i1.sql(f'drop user "{upper_username}"')
-    assert acl["row_count"] == 1
-
-    acl = i1.sql(
-        f"""
+    sql_ok(i1.sql(f'drop user "{upper_username}"'))
+    sql_ok(
+        i1.sql(
+            f"""
         create user "{upper_username}" password '' using ldap
     """
+        )
     )
-    assert acl["row_count"] == 1
     # * Username without parentheses should be upcasted.
-    acl = i1.sql(f"drop user {username}")
-    assert acl["row_count"] == 1
+    sql_ok(i1.sql(f"drop user {username}"))
     # * Username without parentheses should be upcasted.
-    acl = i1.sql(
-        f"""
+    sql_ok(
+        i1.sql(
+            f"""
         create user {username} with password '{password}'
         option (timeout = 3)
     """
+        )
     )
-    assert acl["row_count"] == 1
-    acl = i1.sql(f'drop user "{upper_username}"')
-    assert acl["row_count"] == 1
+    sql_ok(i1.sql(f'drop user "{upper_username}"'))
 
     # Check user creation with LDAP works well with non-empty password specification
     # (it must be ignored).
-    acl = i1.sql(
-        f"""
+    sql_ok(
+        i1.sql(
+            f"""
         create user "{upper_username}" password 'smth' using ldap
     """
+        )
     )
-    assert acl["row_count"] == 1
-    acl = i1.sql(f"drop user {username}")
-    assert acl["row_count"] == 1
+    sql_ok(i1.sql(f"drop user {username}"))
 
     # We can safely retry creating the same user.
-    acl = i1.sql(f"create user {username} with password '{password}' using md5")
-    assert acl["row_count"] == 1
-    acl = i1.sql(f"create user {username} with password '{password}' using md5")
-    assert acl["row_count"] == 0
-    acl = i1.sql(f"drop user {username}")
-    assert acl["row_count"] == 1
+    sql_ok(i1.sql(f"create user {username} with password '{password}' using md5"))
+    sql_no(i1.sql(f"create user {username} with password '{password}' using md5"))
+    sql_ok(i1.sql(f"drop user {username}"))
 
     # Zero timeout should return timeout error.
     with pytest.raises(ReturnError, match="timeout"):
@@ -827,13 +838,11 @@ def test_sql_acl_user(cluster: Cluster):
     with pytest.raises(ReturnError, match="already exists with different auth method"):
         i1.sql(f"create user {username} with password '123456789' using md5")
         i1.sql(f"create user {username} with password '987654321' using md5")
-    acl = i1.sql(f"drop user {username}")
-    assert acl["row_count"] == 1
+    sql_ok(i1.sql(f"drop user {username}"))
 
     another_password = "qwerty123"
     # Alter of unexisted user should do nothing.
-    acl = i1.sql(f"alter user \"nobody\" with password '{another_password}'")
-    assert acl["row_count"] == 0
+    sql_no(i1.sql(f"alter user \"nobody\" with password '{another_password}'"))
 
     # Check altering works.
     acl = i1.sql(f"create user {username} with password '{password}' using md5")
@@ -864,23 +873,20 @@ def test_sql_acl_user(cluster: Cluster):
     users_auth_became = i1.call("box.space._pico_user:select")[2][3]
     assert users_auth_was[0] != users_auth_became[0]
     assert users_auth_became[1] == ""
-    acl = i1.sql(f"drop user {username}")
-    assert acl["row_count"] == 1
+    sql_ok(i1.sql(f"drop user {username}"))
 
     # Attempt to create role with the name of already existed user
     # should lead to an error.
-    acl = i1.sql(f""" create user "{username}" with password '123456789' using md5 """)
-    assert acl["row_count"] == 1
+    sql_ok(i1.sql(f"create user \"{username}\" with password '123456789' using md5"))
     with pytest.raises(ReturnError, match="User with the same name already exists"):
         i1.sql(f'create role "{username}"')
+    sql_ok(i1.sql(f'drop user "{username}"'))
 
     # Dropping role that doesn't exist should return 0.
-    acl = i1.sql(f"drop role {rolename}")
-    assert acl["row_count"] == 0
+    sql_no(i1.sql(f"drop role {rolename}"))
 
     # Successive creation of role.
-    acl = i1.sql(f'create role "{rolename}"')
-    assert acl["row_count"] == 1
+    sql_ok(i1.sql(f'create role "{rolename}"'))
     # Unable to alter role.
     with pytest.raises(
         ReturnError, match=f"Role {rolename} exists. Unable to alter role."
@@ -888,8 +894,7 @@ def test_sql_acl_user(cluster: Cluster):
         i1.sql(f"alter user \"{rolename}\" with password '{password}'")
 
     # Creation of the role that already exists shouldn't do anything.
-    acl = i1.sql(f'create role "{rolename}"')
-    assert acl["row_count"] == 0
+    sql_no(i1.sql(f'create role "{rolename}"'))
 
     # Dropping role that does exist should return 1.
     acl = i1.sql(f'drop role "{rolename}"')
@@ -897,18 +902,333 @@ def test_sql_acl_user(cluster: Cluster):
     assert i1.call("box.space._pico_role:select") == default_roles
 
     # All the rolenames below should match the same role.
-    acl = i1.sql(f'create role "{upper_rolename}"')
-    assert acl["row_count"] == 1
-    acl = i1.sql(f"drop role {upper_rolename}")
-    assert acl["row_count"] == 1
-    acl = i1.sql(f'create role "{upper_rolename}"')
-    assert acl["row_count"] == 1
-    acl = i1.sql(f"drop role {rolename}")
-    assert acl["row_count"] == 1
-    acl = i1.sql(f'create role "{upper_rolename}"')
-    assert acl["row_count"] == 1
-    acl = i1.sql(f'drop role "{upper_rolename}"')
-    assert acl["row_count"] == 1
+    sql_ok(i1.sql(f'create role "{upper_rolename}"'))
+    sql_ok(i1.sql(f"drop role {upper_rolename}"))
+    sql_ok(i1.sql(f'create role "{upper_rolename}"'))
+    sql_ok(i1.sql(f"drop role {rolename}"))
+    sql_ok(i1.sql(f'create role "{upper_rolename}"'))
+    sql_ok(i1.sql(f'drop role "{upper_rolename}"'))
+
+
+# TODO: replace all Lua `i1.call`s to SQL `iq.sql`.
+# See https://git.picodata.io/picodata/picodata/sbroad/-/issues/492.
+def test_sql_acl_privileges(cluster: Cluster):
+    cluster.deploy(instance_count=2)
+    i1, i2 = cluster.instances
+
+    username = "USER"
+    another_username = "ANOTHER_USER"
+    password = "PASSWORD"
+    rolename = "ROLE"
+    another_rolename = "ANOTHER_ROLE"
+
+    # Create users.
+    sql_ok(i1.sql(f"create user {username} with password '{password}'"))
+    sql_ok(i1.sql(f"create user {another_username} with password '{password}'"))
+    # Create roles.
+    sql_ok(i1.sql(f"create role {rolename}"))
+    sql_ok(i1.sql(f"create role {another_rolename}"))
+    # Create tables.
+    table_name = "T1"
+    another_table_name = "T2"
+    sql_ok(
+        i1.sql(
+            f"""
+        create table {table_name} ("a" int not null, primary key ("a"))
+        distributed by ("a")
+    """
+        )
+    )
+    sql_ok(
+        i1.sql(
+            f"""
+        create table {another_table_name} ("a" int not null, primary key ("a"))
+        distributed by ("a")
+    """
+        )
+    )
+
+    # Grant remote functions call.
+    i1.call("pico.grant_privilege", username, "execute", "universe", None)
+
+    # =========================ERRORs======================
+    # Attempt to grant unsupported privileges.
+    raises(
+        ReturnError,
+        r"Supported privileges are: \[Read, Write, Alter, Drop\]",
+        lambda: i1.sql(f""" grant create on table {table_name} to {username} """),
+    )
+    raises(
+        ReturnError,
+        r"Supported privileges are: \[Create, Alter, Drop\]",
+        lambda: i1.sql(f""" grant read user to {username} """),
+    )
+    raises(
+        ReturnError,
+        r"Supported privileges are: \[Alter, Drop\]",
+        lambda: i1.sql(f""" grant create on user {username} to {rolename} """),
+    )
+    raises(
+        ReturnError,
+        r"Supported privileges are: \[Create, Drop\]",
+        lambda: i1.sql(f""" grant alter role to {username} """),
+    )
+    raises(
+        ReturnError,
+        r"Supported privileges are: \[Drop\]",
+        lambda: i1.sql(f""" grant create on role {rolename} to {username} """),
+    )
+
+    # Attempt to grant unexisted role.
+    raises(
+        ReturnError,
+        "There is no role with name SUPER",
+        lambda: i1.sql(f""" grant SUPER to {username} """),
+    )
+    # Attempt to grant TO unexisted role.
+    raises(
+        ReturnError,
+        "Nor user, neither role with name SUPER exists",
+        lambda: i1.sql(f""" grant {rolename} to SUPER """),
+    )
+    # Attempt to revoke unexisted role.
+    raises(
+        ReturnError,
+        "There is no role with name SUPER",
+        lambda: i1.sql(f""" revoke SUPER from {username} """),
+    )
+    # Attempt to revoke privilege that hasn't been granted yet do noting.
+    sql_no(i1.sql(f""" revoke read on table {table_name} from {username} """))
+    # TODO: Attempt to grant role that doesn't visible for user.
+    # TODO: Attempt to revoke role that doesn't visible for user.
+    # TODO: Attempt to grant TO a user that doesn't visible for user.
+    # TODO: Attempt to grant TO a role that doesn't visible for user.
+    # TODO: Attempt to revoke FROM a user that doesn't visible for user.
+    # TODO: Attempt to revoke FROM a role that doesn't visible for user.
+
+    # TODO: ================USERs interaction================
+    # * TODO: User creation is prohibited.
+    # * Grant CREATE to user.
+    sql_ok(i1.sql(f""" grant create user to {username} """))
+    # * Check privileges table is updated.
+    privs_rows = i1.sql(""" select * from "_pico_privilege" """)["rows"][1]
+    assert privs_rows[2] == "user"
+    assert privs_rows[3] == ""
+    assert privs_rows[4] == "create"
+    # * TODO: User creation is available.
+    # * Revoke CREATE from user.
+    sql_ok(i1.sql(f""" revoke create user from {username} """))
+    # * Check privileges table is updated.
+    privs_rows = i1.sql(""" select * from "_pico_privilege" """)["rows"]
+    assert len(privs_rows) == 1  # universe
+    # * TODO: Check that user with granted privileges can ALTER and DROP created user
+    #         as it's the owner.
+    # * TODO: Revoke automatically granted privileges.
+    # * TODO: Check ALTER and DROP are prohibited.
+    # * Grant global ALTER on users.
+    sql_ok(i1.sql(f""" grant alter user to {username} """))
+    # * Check privileges table is updated.
+    privs_rows = i1.sql(""" select * from "_pico_privilege" """)["rows"]
+    assert len(privs_rows) == 2  # universe + alter user
+    # * TODO: Check ALTER is available.
+    # * Revoke global ALTER.
+    sql_ok(i1.sql(f""" revoke alter user from {username} """))
+
+    # * TODO: Check another user can't initially interact with previously created new user.
+    # * TODO: Grant ALTER and DROP user privileges to another user.
+    # * TODO: Check user alternation is available.
+    # * TODO: Check user drop is available.
+
+    # TODO: ================ROLEs interaction================
+    # * TODO: Role creation is prohibited.
+    # * Grant CREATE to user.
+    sql_ok(i1.sql(f""" grant create role to {username} """))
+    # * Check privileges table is updated.
+    privs_rows = i1.sql(""" select * from "_pico_privilege" """)["rows"][0]
+    assert privs_rows[2] == "role"
+    assert privs_rows[3] == ""
+    assert privs_rows[4] == "create"
+    # * TODO: Role creation is available.
+    # * Revoke CREATE from user.
+    sql_ok(i1.sql(f""" revoke create role from {username} """))
+    # * Check privileges table is updated.
+    privs_rows = i1.sql(""" select * from "_pico_privilege" """)["rows"]
+    assert len(privs_rows) == 1  # universe
+    # * TODO: Check that user with granted privileges can DROP created role as it's the owner.
+    # * TODO: Revoke automatically granted privileges.
+    # * TODO: Check DROP are prohibited.
+    # * Grant global drop on role.
+    sql_ok(i1.sql(f""" grant drop role to {username} """))
+    # * Check privileges table is updated.
+    privs_rows = i1.sql(""" select * from "_pico_privilege" """)["rows"]
+    assert len(privs_rows) == 2  # universe + drop user
+    # * TODO: Check DROP is available.
+    # * Revoke global DROP.
+    sql_ok(i1.sql(f""" revoke drop role from {username} """))
+
+    # * TODO: Check another user can't initially interact with previously created new role.
+    # * TODO: Grant DROP role privileges to another user.
+    # * TODO: Check role drop is available.
+
+    # TODO: ================TABLEs interaction===============
+    # ------------------READ---------------------------------
+    # * READ is not available.
+    raises(
+        ReturnError,
+        rf"Read access to space '{table_name}' is denied for user '{username}'",
+        lambda: i1.sql(
+            f""" select * from {table_name} """, user=username, password=password
+        ),
+    )
+    # * Grant READ to user.
+    sql_ok(i1.sql(f""" grant read on table {table_name} to {username} """))
+    # * Granting already granted privilege do nothing.
+    sql_no(i1.sql(f""" grant read on table {table_name} to {username} """))
+    # * After grant READ succeeds.
+    i1.sql(f""" select * from {table_name} """, user=username, password=password)
+    # * Revoke READ.
+    sql_ok(i1.sql(f""" revoke read on table {table_name} from {username} """))
+    # * After revoke READ fails again.
+    raises(
+        ReturnError,
+        rf"Read access to space '{table_name}' is denied for user '{username}'",
+        lambda: i1.sql(
+            f""" select * from {table_name} """, user=username, password=password
+        ),
+    )
+    # ------------------WRITE---------------------------------
+    # TODO: remove
+    sql_ok(i1.sql(f""" grant read on table {table_name} to {username} """))
+    # * WRITE is not available.
+    raises(
+        ReturnError,
+        rf"Write access to space '{table_name}' is denied for user '{username}'",
+        lambda: i1.sql(
+            f""" insert into {table_name} values (1) """,
+            user=username,
+            password=password,
+        ),
+    )
+    # * Grant WRITE to user.
+    sql_ok(i1.sql(f""" grant write on table {table_name} to {username} """))
+    # * WRITE succeeds.
+    i1.sql(
+        f""" insert into {table_name} values (1) """, user=username, password=password
+    )
+    i1.sql(f""" delete from {table_name} where "a" = 1 """)
+    # * Revoke WRITE from role.
+    sql_ok(i1.sql(f""" revoke write on table {table_name} from {username} """))
+    # * WRITE fails again.
+    raises(
+        ReturnError,
+        rf"Write access to space '{table_name}' is denied for user '{username}'",
+        lambda: i1.sql(
+            f""" insert into {table_name} values (1) """,
+            user=username,
+            password=password,
+        ),
+    )
+    # TODO: remove
+    sql_ok(i1.sql(f""" revoke read on table {table_name} from {username} """))
+    # ------------------CREATE---------------------------------
+    # * TODO: Unable to create table.
+    # * Grant CREATE to user.
+    sql_ok(i1.sql(f""" grant create table to {username} """))
+    # * TODO: Creation is available.
+    # * TODO: Check user can do everything he wants on a table he created:
+    # ** READ.
+    # ** WRITE.
+    # ** CREATE index.
+    # ** ALTER index.
+    # ** DROP.
+    # * Revoke CREATE from user.
+    sql_ok(i1.sql(f""" revoke create table from {username} """))
+    # * TODO: Creation is not available again.
+    # ------------------ALTER--------------------------------
+    # * TODO: Unable to create new table index.
+    # * Grant ALTER to user.
+    sql_ok(i1.sql(f""" grant alter on table {table_name} to {username} """))
+    # * TODO: Index creation succeeds.
+    # * Revoke ALTER from user.
+    sql_ok(i1.sql(f""" revoke alter on table {table_name} from {username} """))
+    # * TODO: Attempt to remove index fails.
+    # ------------------DROP---------------------------------
+    # * TODO: Unable to drop table previously created by admin.
+    # * Grant DROP to user.
+    sql_ok(i1.sql(f""" grant drop on table {table_name} to {username} """))
+    # * TODO: Able to drop admin table.
+    # * Revoke DROP from user.
+    sql_ok(i1.sql(f""" revoke drop on table {table_name} from {username} """))
+
+    # Grant global tables READ, WRITE, ALTER, DROP.
+    sql_ok(i1.sql(f""" grant read table to {username} """))
+    sql_ok(i1.sql(f""" grant write table to {username} """))
+    sql_ok(i1.sql(f""" grant alter table to {username} """))
+    sql_ok(i1.sql(f""" grant drop table to {username} """))
+    # Check all operations available on another_table created by admin.
+    i1.sql(
+        f""" select * from {another_table_name} """, user=username, password=password
+    )
+    i1.sql(
+        f""" insert into {another_table_name} values (1) """,
+        user=username,
+        password=password,
+    )
+    i1.sql(f""" delete from {another_table_name} """, user=username, password=password)
+    i1.eval(
+        f"""
+        box.space.{another_table_name}:create_index('some', {{ parts = {{ 'a' }} }})
+    """
+    )
+    i1.sql(f""" delete from {another_table_name} """, user=username, password=password)
+    sql_ok(
+        i1.sql(
+            f""" drop table {another_table_name} """, user=username, password=password
+        )
+    )
+    # Revoke global privileges
+    sql_ok(i1.sql(f""" revoke read table from {username} """))
+    sql_ok(i1.sql(f""" revoke write table from {username} """))
+    sql_ok(i1.sql(f""" revoke alter table from {username} """))
+    sql_ok(i1.sql(f""" revoke drop table from {username} """))
+
+    # ================ROLE passing================
+    # * Check there are no privileges granted to anything initially.
+    privs_rows = i1.sql(""" select * from "_pico_privilege" """)["rows"]
+    assert len(privs_rows) == 1  # universe
+    # * Read from table is prohibited for user initially.
+    raises(
+        ReturnError,
+        rf"Read access to space '{table_name}' is denied for user '{username}'",
+        lambda: i1.sql(
+            f""" select * from {table_name} """, user=username, password=password
+        ),
+    )
+    # * Grant table READ and WRITE to role.
+    sql_ok(i1.sql(f""" grant read on table {table_name} to {rolename} """))
+    sql_ok(i1.sql(f""" grant write on table {table_name} to {rolename} """))
+    # * Grant ROLE to user.
+    sql_ok(i1.sql(f""" grant {rolename} to {username} """))
+    # * Check read and write is available for user.
+    i1.sql(f""" select * from {table_name} """, user=username, password=password)
+    i1.sql(
+        f""" insert into {table_name} values (1) """, user=username, password=password
+    )
+    i1.sql(f""" delete from {table_name} where "a" = 1 """)
+    # * Revoke privileges from role.
+    sql_ok(i1.sql(f""" revoke write on table {table_name} from {rolename} """))
+    sql_ok(i1.sql(f""" revoke read on table {table_name} from {rolename} """))
+    # * Check privilege revoked from role and user.
+    privs_rows = i1.sql(""" select * from "_pico_privilege" """)["rows"]
+    assert len(privs_rows) == 2  # universe, role for user
+    # * Check read is prohibited again.
+    raises(
+        ReturnError,
+        rf"Read access to space '{table_name}' is denied for user '{username}'",
+        lambda: i1.sql(
+            f""" select * from {table_name} """, user=username, password=password
+        ),
+    )
 
 
 def test_distributed_sql_via_set_language(cluster: Cluster):
@@ -961,96 +1281,123 @@ def test_sql_privileges(cluster: Cluster):
     cluster.deploy(instance_count=1)
     i1 = cluster.instances[0]
 
+    table_name = "t"
     # Create a test table
-    ddl = i1.sql(
-        """
-        create table "t" ("a" int not null, "b" int, primary key ("a"))
+    sql_ok(
+        i1.sql(
+            f"""
+        create table "{table_name}" ("a" int not null, "b" int, primary key ("a"))
         using memtx
         distributed by ("a")
         option (timeout = 3)
     """
+        )
     )
-    assert ddl["row_count"] == 1
 
-    # Create user with execute on universe privilege
+    username = "alice"
     alice_pwd = "1234567890"
-    acl = i1.sql(
-        f"""
-        create user "alice" with password '{alice_pwd}'
+
+    # Create user with execute on universe privilege
+    sql_ok(
+        i1.sql(
+            f"""
+        create user "{username}" with password '{alice_pwd}'
         using chap-sha1 option (timeout = 3)
-        """
+    """
+        )
     )
-    assert acl["row_count"] == 1
-    i1.eval(""" pico.grant_privilege("alice", "execute", "universe") """)
+    i1.eval(f""" pico.grant_privilege("{username}", "execute", "universe") """)
 
     # ------------------------
     # Check SQL read privilege
     # ------------------------
-    with pytest.raises(ReturnError, match="AccessDenied: Read access to space 't'"):
-        i1.sql(""" select * from "t" """, user="alice", password=alice_pwd)
-
+    raises(
+        ReturnError,
+        f"AccessDenied: Read access to space '{table_name}'",
+        lambda: i1.sql(f""" select * from "{table_name}" """, user=username, password=alice_pwd),
+    )
     # Grant read privilege
-    i1.eval(""" pico.grant_privilege("alice", "read", "table", "t") """)
-    dql = i1.sql(""" select * from "t" """, user="alice", password=alice_pwd)
+    i1.eval(f""" pico.grant_privilege("{username}", "read", "space", "{table_name}") """)
+    dql = i1.sql(f""" select * from "{table_name}" """, user=username, password=alice_pwd)
     assert dql["rows"] == []
 
     # Revoke read privilege
-    i1.eval(""" pico.revoke_privilege("alice", "read", "table", "t") """)
+    i1.eval(f""" pico.revoke_privilege("{username}", "read", "space", "{table_name}") """)
 
     # -------------------------
     # Check SQL write privilege
     # -------------------------
-    with pytest.raises(ReturnError, match="AccessDenied: Write access to space 't'"):
-        i1.sql(""" insert into "t" values (1, 2) """, user="alice", password=alice_pwd)
+    raises(
+        ReturnError,
+        f"AccessDenied: Write access to space '{table_name}'",
+        lambda: i1.sql(f""" insert into "{table_name}" values (1, 2) """, user=username, password=alice_pwd)
+    )
 
     # Grant write privilege
-    i1.eval(""" pico.grant_privilege("alice", "write", "table", "t") """)
-    dml = i1.sql(
-        """ insert into "t" values (1, 2) """, user="alice", password=alice_pwd
-    )
-    assert dml["row_count"] == 1
+    i1.eval(f""" pico.grant_privilege("{username}", "write", "space", "{table_name}") """)
+    sql_ok(i1.sql(
+        f""" insert into "{table_name}" values (1, 2) """, user=username, password=alice_pwd
+    ))
 
     # Revoke write privilege
-    i1.eval(""" pico.revoke_privilege("alice", "write", "table", "t") """)
+    i1.eval(f""" pico.revoke_privilege("{username}", "write", "space", "{table_name}") """)
 
     # -----------------------------------
     # Check SQL write and read privileges
     # -----------------------------------
-    with pytest.raises(ReturnError, match="AccessDenied: Read access to space 't'"):
-        i1.sql(
-            """ insert into "t" select "a" + 1, "b" from "t"  """,
-            user="alice",
+    raises(
+        ReturnError,
+        f"AccessDenied: Read access to space '{table_name}'",
+        lambda: i1.sql(
+            f""" insert into "{table_name}" select "a" + 1, "b" from "{table_name}"  """,
+            user=username,
             password=alice_pwd,
         )
-    with pytest.raises(ReturnError, match="AccessDenied: Read access to space 't'"):
-        i1.sql(""" update "t" set "b" = 42 """, user="alice", password=alice_pwd)
-    with pytest.raises(ReturnError, match="AccessDenied: Read access to space 't'"):
-        i1.sql(""" delete from "t" """, user="alice", password=alice_pwd)
+    )
+    raises(
+        ReturnError,
+        f"AccessDenied: Read access to space '{table_name}'",
+        lambda: i1.sql(f""" update "{table_name}" set "b" = 42 """, user=username, password=alice_pwd)
+    )
+    raises(
+        ReturnError,
+        f"AccessDenied: Read access to space '{table_name}'",
+        lambda: i1.sql(f""" delete from "{table_name}" """, user=username, password=alice_pwd)
+    )
 
     # Grant read privilege
-    i1.eval(""" pico.grant_privilege("alice", "read", "table", "t") """)
+    i1.eval(f""" pico.grant_privilege("{username}", "read", "space", "{table_name}") """)
 
-    with pytest.raises(ReturnError, match="AccessDenied: Write access to space 't'"):
-        i1.sql(
-            """ insert into "t" select "a" + 1, "b" from "t"  """,
-            user="alice",
+    raises(
+        ReturnError,
+        f"AccessDenied: Write access to space '{table_name}'",
+        lambda: i1.sql(
+            f""" insert into "{table_name}" select "a" + 1, "b" from "{table_name}"  """,
+            user=username,
             password=alice_pwd,
         )
-    with pytest.raises(ReturnError, match="AccessDenied: Write access to space 't'"):
-        i1.sql(""" update "t" set "b" = 42 """, user="alice", password=alice_pwd)
-    with pytest.raises(ReturnError, match="AccessDenied: Write access to space 't'"):
-        i1.sql(""" delete from "t" """, user="alice", password=alice_pwd)
+    )
+    raises(
+        ReturnError,
+        f"AccessDenied: Write access to space '{table_name}'",
+        lambda: i1.sql(f""" update "{table_name}" set "b" = 42 """, user=username, password=alice_pwd)
+    )
+    raises(
+        ReturnError,
+        f"AccessDenied: Write access to space '{table_name}'",
+        lambda: i1.sql(f""" delete from "{table_name}" """, user=username, password=alice_pwd)
+    )
 
     # Grant write privilege
-    i1.eval(""" pico.grant_privilege("alice", "write", "table", "t") """)
+    i1.eval(f""" pico.grant_privilege("{username}", "write", "space", "{table_name}") """)
 
     dml = i1.sql(
-        """ insert into "t" select "a" + 1, "b" from "t"  """,
-        user="alice",
+        f""" insert into "{table_name}" select "a" + 1, "b" from "{table_name}"  """,
+        user=username,
         password=alice_pwd,
     )
     assert dml["row_count"] == 1
-    dml = i1.sql(""" update "t" set "b" = 42 """, user="alice", password=alice_pwd)
+    dml = i1.sql(f""" update "{table_name}" set "b" = 42 """, user=username, password=alice_pwd)
     assert dml["row_count"] == 2
-    dml = i1.sql(""" delete from "t" """, user="alice", password=alice_pwd)
+    dml = i1.sql(f""" delete from "{table_name}" """, user=username, password=alice_pwd)
     assert dml["row_count"] == 2
-- 
GitLab