Skip to content
Snippets Groups Projects
Commit c3729a95 authored by EmirVildanov's avatar EmirVildanov
Browse files

feat: removed helper assertions methods

parent 71474f57
No related branches found
No related tags found
1 merge request!707feat: support sql GRANT/REVOKE privileges and ALTER USER Login/NoLogin options
......@@ -9,22 +9,6 @@ from conftest import (
)
# Check that sql query execution does smth.
def sql_ok(res):
assert res["row_count"] == 1
# Check that sql query execution does nothing.
def sql_no(res):
assert res["row_count"] == 0
# Check that query execution raises with given error and message pattern.
def raises(error, pattern, func):
with pytest.raises(error, match=pattern):
func()
def test_pico_sql(cluster: Cluster):
cluster.deploy(instance_count=1)
i1 = cluster.instances[0]
......@@ -685,17 +669,15 @@ def test_sql_acl_password_length(cluster: Cluster):
password_short = "pwd"
password_long = "password"
sql_ok(
i1.sql(
"""
create user {username} with password '{password}'
using md5 option (timeout = 3)
""".format(
username=username, password=password_long
)
)
acl = i1.sql(
f"""
create user {username} with password '{password_long}'
using md5 option (timeout = 3)
"""
)
sql_ok(i1.sql(f"drop user {username}"))
assert acl["row_count"] == 1
acl = i1.sql(f"drop user {username}")
assert acl["row_count"] == 1
with pytest.raises(ReturnError, match="password is too short"):
i1.sql(
......@@ -707,17 +689,15 @@ def test_sql_acl_password_length(cluster: Cluster):
)
)
sql_ok(
i1.sql(
"""
create user {username} with password '{password}'
using ldap option (timeout = 3)
""".format(
username=username, password=password_short
)
)
acl = i1.sql(
f"""
create user {username} with password '{password_short}'
using ldap option (timeout = 3)
"""
)
sql_ok(i1.sql(f"drop user {username}"))
assert acl["row_count"] == 1
acl = i1.sql(f"drop user {username}")
assert acl["row_count"] == 1
def test_sql_acl_users_roles(cluster: Cluster):
......@@ -739,11 +719,12 @@ def test_sql_acl_users_roles(cluster: Cluster):
create user "{username}" with password '{password}'
using md5 option (timeout = 3)
"""
)
)
assert acl["row_count"] == 1
# Dropping user that doesn't exist should return 0.
sql_ok(i1.sql(f"drop user {upper_username}"))
acl = i1.sql(f"drop user {upper_username}")
assert acl["row_count"] == 0
# Dropping user that does exist should return 1.
acl = i1.sql(f'drop user "{username}"')
......@@ -752,51 +733,56 @@ def test_sql_acl_users_roles(cluster: Cluster):
# All the usernames below should match the same user.
# * Upcasted username in double parentheses shouldn't change.
sql_ok(
i1.sql(
f"""
acl = i1.sql(
f"""
create user "{upper_username}" password '{password}'
using chap-sha1
"""
)
)
assert acl["row_count"] == 1
# * Username as is in double parentheses.
sql_ok(i1.sql(f'drop user "{upper_username}"'))
sql_ok(
i1.sql(
f"""
acl = i1.sql(f'drop user "{upper_username}"')
assert acl["row_count"] == 1
i1.sql(
f"""
create user "{upper_username}" password '' using ldap
"""
)
)
assert acl["row_count"] == 1
# * Username without parentheses should be upcasted.
sql_ok(i1.sql(f"drop user {username}"))
i1.sql(f"drop user {username}")
assert acl["row_count"] == 1
# * Username without parentheses should be upcasted.
sql_ok(
i1.sql(
f"""
acl = i1.sql(
f"""
create user {username} with password '{password}'
option (timeout = 3)
"""
)
)
sql_ok(i1.sql(f'drop user "{upper_username}"'))
assert acl["row_count"] == 1
acl = i1.sql(f'drop user "{upper_username}"')
assert acl["row_count"] == 1
# Check user creation with LDAP works well with non-empty password specification
# (it must be ignored).
sql_ok(
i1.sql(
f"""
acl = i1.sql(
f"""
create user "{upper_username}" password 'smth' using ldap
"""
)
)
sql_ok(i1.sql(f"drop user {username}"))
assert acl["row_count"] == 1
acl = i1.sql(f"drop user {username}")
assert acl["row_count"] == 1
# We can safely retry creating the same user.
sql_ok(i1.sql(f"create user {username} with password '{password}' using md5"))
sql_no(i1.sql(f"create user {username} with password '{password}' using md5"))
sql_ok(i1.sql(f"drop user {username}"))
acl = i1.sql(f"create user {username} with password '{password}' using md5")
assert acl["row_count"] == 1
acl = i1.sql(f"create user {username} with password '{password}' using md5")
assert acl["row_count"] == 0
acl = i1.sql(f"drop user {username}")
assert acl["row_count"] == 1
# Zero timeout should return timeout error.
with pytest.raises(ReturnError, match="timeout"):
......@@ -838,11 +824,13 @@ def test_sql_acl_users_roles(cluster: Cluster):
with pytest.raises(ReturnError, match="already exists with different auth method"):
i1.sql(f"create user {username} with password '123456789' using md5")
i1.sql(f"create user {username} with password '987654321' using md5")
sql_ok(i1.sql(f"drop user {username}"))
acl = i1.sql(f"drop user {username}")
assert acl["row_count"] == 1
another_password = "qwerty123"
# Alter of unexisted user should do nothing.
sql_no(i1.sql(f"alter user \"nobody\" with password '{another_password}'"))
acl = i1.sql(f"alter user \"nobody\" with password '{another_password}'")
assert acl["row_count"] == 0
# Check altering works.
acl = i1.sql(f"create user {username} with password '{password}' using md5")
......@@ -873,20 +861,25 @@ def test_sql_acl_users_roles(cluster: Cluster):
users_auth_became = i1.call("box.space._pico_user:select")[2][3]
assert users_auth_was[0] != users_auth_became[0]
assert users_auth_became[1] == ""
sql_ok(i1.sql(f"drop user {username}"))
acl = i1.sql(f"drop user {username}")
assert acl["row_count"] == 1
# Attempt to create role with the name of already existed user
# should lead to an error.
sql_ok(i1.sql(f"create user \"{username}\" with password '123456789' using md5"))
acl = i1.sql(f"create user \"{username}\" with password '123456789' using md5")
assert acl["row_count"] == 1
with pytest.raises(ReturnError, match="User with the same name already exists"):
i1.sql(f'create role "{username}"')
sql_ok(i1.sql(f'drop user "{username}"'))
acl = i1.sql(f'drop user "{username}"')
assert acl["row_count"] == 1
# Dropping role that doesn't exist should return 0.
sql_no(i1.sql(f"drop role {rolename}"))
acl = i1.sql(f"drop role {rolename}")
assert acl["row_count"] == 0
# Successive creation of role.
sql_ok(i1.sql(f'create role "{rolename}"'))
acl = i1.sql(f'create role "{rolename}"')
assert acl["row_count"] == 1
# Unable to alter role.
with pytest.raises(
ReturnError, match=f"Role {rolename} exists. Unable to alter role."
......@@ -894,7 +887,8 @@ def test_sql_acl_users_roles(cluster: Cluster):
i1.sql(f"alter user \"{rolename}\" with password '{password}'")
# Creation of the role that already exists shouldn't do anything.
sql_no(i1.sql(f'create role "{rolename}"'))
acl = i1.sql(f'create role "{rolename}"')
assert acl["row_count"] == 0
# Dropping role that does exist should return 1.
acl = i1.sql(f'drop role "{rolename}"')
......@@ -902,12 +896,18 @@ def test_sql_acl_users_roles(cluster: Cluster):
assert i1.call("box.space._pico_role:select") == default_roles
# All the rolenames below should match the same role.
sql_ok(i1.sql(f'create role "{upper_rolename}"'))
sql_ok(i1.sql(f"drop role {upper_rolename}"))
sql_ok(i1.sql(f'create role "{upper_rolename}"'))
sql_ok(i1.sql(f"drop role {rolename}"))
sql_ok(i1.sql(f'create role "{upper_rolename}"'))
sql_ok(i1.sql(f'drop role "{upper_rolename}"'))
acl = i1.sql(f'create role "{upper_rolename}"')
assert acl["row_count"] == 1
acl = i1.sql(f"drop role {upper_rolename}")
assert acl["row_count"] == 1
acl = i1.sql(f'create role "{upper_rolename}"')
assert acl["row_count"] == 1
acl = i1.sql(f"drop role {rolename}")
assert acl["row_count"] == 1
acl = i1.sql(f'create role "{upper_rolename}"')
assert acl["row_count"] == 1
acl = i1.sql(f'drop role "{upper_rolename}"')
assert acl["row_count"] == 1
# TODO: replace all Lua `i1.call`s to SQL `iq.sql`.
......@@ -923,82 +923,70 @@ def test_sql_acl_privileges(cluster: Cluster):
another_rolename = "ANOTHER_ROLE"
# Create users.
sql_ok(i1.sql(f"create user {username} with password '{password}'"))
sql_ok(i1.sql(f"create user {another_username} with password '{password}'"))
acl = i1.sql(f"create user {username} with password '{password}'")
assert acl["row_count"] == 1
acl = i1.sql(f"create user {another_username} with password '{password}'")
assert acl["row_count"] == 1
# Create roles.
sql_ok(i1.sql(f"create role {rolename}"))
sql_ok(i1.sql(f"create role {another_rolename}"))
acl = i1.sql(f"create role {rolename}")
assert acl["row_count"] == 1
acl = i1.sql(f"create role {another_rolename}")
assert acl["row_count"] == 1
# Create tables.
table_name = "T1"
another_table_name = "T2"
sql_ok(
i1.sql(
f"""
ddl = i1.sql(
f"""
create table {table_name} ("a" int not null, primary key ("a"))
distributed by ("a")
"""
)
)
sql_ok(
i1.sql(
f"""
assert ddl["row_count"] == 1
ddl = i1.sql(
f"""
create table {another_table_name} ("a" int not null, primary key ("a"))
distributed by ("a")
"""
)
)
assert ddl["row_count"] == 1
# Grant remote functions call.
i1.call("pico.grant_privilege", username, "execute", "universe", None)
# =========================ERRORs======================
# Attempt to grant unsupported privileges.
raises(
ReturnError,
r"Supported privileges are: \[Read, Write, Alter, Drop\]",
lambda: i1.sql(f""" grant create on table {table_name} to {username} """),
)
raises(
ReturnError,
r"Supported privileges are: \[Create, Alter, Drop\]",
lambda: i1.sql(f""" grant read user to {username} """),
)
raises(
ReturnError,
r"Supported privileges are: \[Alter, Drop\]",
lambda: i1.sql(f""" grant create on user {username} to {rolename} """),
)
raises(
ReturnError,
r"Supported privileges are: \[Create, Drop\]",
lambda: i1.sql(f""" grant alter role to {username} """),
)
raises(
ReturnError,
r"Supported privileges are: \[Drop\]",
lambda: i1.sql(f""" grant create on role {rolename} to {username} """),
)
with pytest.raises(
ReturnError, match=r"Supported privileges are: \[Read, Write, Alter, Drop\]"
):
i1.sql(f""" grant create on table {table_name} to {username} """)
with pytest.raises(
ReturnError, match=r"Supported privileges are: \[Create, Alter, Drop\]"
):
i1.sql(f""" grant read user to {username} """)
with pytest.raises(ReturnError, match=r"Supported privileges are: \[Alter, Drop\]"):
i1.sql(f""" grant create on user {username} to {rolename} """)
with pytest.raises(
ReturnError, match=r"Supported privileges are: \[Create, Drop\]"
):
i1.sql(f""" grant alter role to {username} """)
with pytest.raises(ReturnError, match=r"Supported privileges are: \[Drop\]"):
i1.sql(f""" grant create on role {rolename} to {username} """)
# Attempt to grant unexisted role.
raises(
ReturnError,
"There is no role with name SUPER",
lambda: i1.sql(f""" grant SUPER to {username} """),
)
with pytest.raises(ReturnError, match="There is no role with name SUPER"):
i1.sql(f""" grant SUPER to {username} """)
# Attempt to grant TO unexisted role.
raises(
ReturnError,
"Nor user, neither role with name SUPER exists",
lambda: i1.sql(f""" grant {rolename} to SUPER """),
)
with pytest.raises(
ReturnError, match="Nor user, neither role with name SUPER exists"
):
i1.sql(f""" grant {rolename} to SUPER """)
# Attempt to revoke unexisted role.
raises(
ReturnError,
"There is no role with name SUPER",
lambda: i1.sql(f""" revoke SUPER from {username} """),
)
with pytest.raises(ReturnError, match="There is no role with name SUPER"):
i1.sql(f""" revoke SUPER from {username} """)
# Attempt to revoke privilege that hasn't been granted yet do noting.
sql_no(i1.sql(f""" revoke read on table {table_name} from {username} """))
acl = i1.sql(f""" revoke read on table {table_name} from {username} """)
assert acl["row_count"] == 0
# TODO: Attempt to grant role that doesn't visible for user.
# TODO: Attempt to revoke role that doesn't visible for user.
# TODO: Attempt to grant TO a user that doesn't visible for user.
......@@ -1022,7 +1010,8 @@ def test_sql_acl_privileges(cluster: Cluster):
# TODO: ================USERs interaction================
# * TODO: User creation is prohibited.
# * Grant CREATE to user.
sql_ok(i1.sql(f""" grant create user to {username} """))
acl = i1.sql(f""" grant create user to {username} """)
assert acl["row_count"] == 1
# * Check privileges table is updated.
privs_rows = i1.sql(""" select * from "_pico_privilege" """)["rows"][1]
assert privs_rows[2] == "user"
......@@ -1030,7 +1019,8 @@ def test_sql_acl_privileges(cluster: Cluster):
assert privs_rows[4] == "create"
# * TODO: User creation is available.
# * Revoke CREATE from user.
sql_ok(i1.sql(f""" revoke create user from {username} """))
acl = i1.sql(f""" revoke create user from {username} """)
assert acl["row_count"] == 1
# * Check privileges table is updated.
privs_rows = i1.sql(""" select * from "_pico_privilege" """)["rows"]
assert len(privs_rows) == 1 # universe
......@@ -1039,13 +1029,15 @@ def test_sql_acl_privileges(cluster: Cluster):
# * TODO: Revoke automatically granted privileges.
# * TODO: Check ALTER and DROP are prohibited.
# * Grant global ALTER on users.
sql_ok(i1.sql(f""" grant alter user to {username} """))
acl = i1.sql(f""" grant alter user to {username} """)
assert acl["row_count"] == 1
# * Check privileges table is updated.
privs_rows = i1.sql(""" select * from "_pico_privilege" """)["rows"]
assert len(privs_rows) == 2 # universe + alter user
# * TODO: Check ALTER is available.
# * Revoke global ALTER.
sql_ok(i1.sql(f""" revoke alter user from {username} """))
acl = i1.sql(f""" revoke alter user from {username} """)
assert acl["row_count"] == 1
# * TODO: Check another user can't initially interact with previously created new user.
# * TODO: Grant ALTER and DROP user privileges to another user.
......@@ -1055,7 +1047,8 @@ def test_sql_acl_privileges(cluster: Cluster):
# TODO: ================ROLEs interaction================
# * TODO: Role creation is prohibited.
# * Grant CREATE to user.
sql_ok(i1.sql(f""" grant create role to {username} """))
acl = i1.sql(f""" grant create role to {username} """)
assert acl["row_count"] == 1
# * Check privileges table is updated.
privs_rows = i1.sql(""" select * from "_pico_privilege" """)["rows"][0]
assert privs_rows[2] == "role"
......@@ -1063,7 +1056,8 @@ def test_sql_acl_privileges(cluster: Cluster):
assert privs_rows[4] == "create"
# * TODO: Role creation is available.
# * Revoke CREATE from user.
sql_ok(i1.sql(f""" revoke create role from {username} """))
acl = i1.sql(f""" revoke create role from {username} """)
assert acl["row_count"] == 1
# * Check privileges table is updated.
privs_rows = i1.sql(""" select * from "_pico_privilege" """)["rows"]
assert len(privs_rows) == 1 # universe
......@@ -1071,13 +1065,15 @@ def test_sql_acl_privileges(cluster: Cluster):
# * TODO: Revoke automatically granted privileges.
# * TODO: Check DROP are prohibited.
# * Grant global drop on role.
sql_ok(i1.sql(f""" grant drop role to {username} """))
acl = i1.sql(f""" grant drop role to {username} """)
assert acl["row_count"] == 1
# * Check privileges table is updated.
privs_rows = i1.sql(""" select * from "_pico_privilege" """)["rows"]
assert len(privs_rows) == 2 # universe + drop user
# * TODO: Check DROP is available.
# * Revoke global DROP.
sql_ok(i1.sql(f""" revoke drop role from {username} """))
acl = i1.sql(f""" revoke drop role from {username} """)
assert acl["row_count"] == 1
# * TODO: Check another user can't initially interact with previously created new role.
# * TODO: Grant DROP role privileges to another user.
......@@ -1086,67 +1082,71 @@ def test_sql_acl_privileges(cluster: Cluster):
# TODO: ================TABLEs interaction===============
# ------------------READ---------------------------------
# * READ is not available.
raises(
with pytest.raises(
ReturnError,
rf"Read access to space '{table_name}' is denied for user '{username}'",
lambda: i1.sql(
f""" select * from {table_name} """, user=username, password=password
),
)
match=rf"Read access to space '{table_name}' is denied for user '{username}'",
):
i1.sql(f""" select * from {table_name} """, user=username, password=password)
# * Grant READ to user.
sql_ok(i1.sql(f""" grant read on table {table_name} to {username} """))
acl = i1.sql(f""" grant read on table {table_name} to {username} """)
assert acl["row_count"] == 1
# * Granting already granted privilege do nothing.
sql_no(i1.sql(f""" grant read on table {table_name} to {username} """))
acl = i1.sql(f""" grant read on table {table_name} to {username} """)
assert acl["row_count"] == 0
# * After grant READ succeeds.
i1.sql(f""" select * from {table_name} """, user=username, password=password)
# * Revoke READ.
sql_ok(i1.sql(f""" revoke read on table {table_name} from {username} """))
acl = i1.sql(f""" revoke read on table {table_name} from {username} """)
assert acl["row_count"] == 1
# * After revoke READ fails again.
raises(
with pytest.raises(
ReturnError,
rf"Read access to space '{table_name}' is denied for user '{username}'",
lambda: i1.sql(
f""" select * from {table_name} """, user=username, password=password
),
)
match=rf"Read access to space '{table_name}' is denied for user '{username}'",
):
i1.sql(f""" select * from {table_name} """, user=username, password=password)
# ------------------WRITE---------------------------------
# TODO: remove
sql_ok(i1.sql(f""" grant read on table {table_name} to {username} """))
acl = i1.sql(f""" grant read on table {table_name} to {username} """)
assert acl["row_count"] == 1
# * WRITE is not available.
raises(
with pytest.raises(
ReturnError,
rf"Write access to space '{table_name}' is denied for user '{username}'",
lambda: i1.sql(
match=rf"Write access to space '{table_name}' is denied for user '{username}'",
):
i1.sql(
f""" insert into {table_name} values (1) """,
user=username,
password=password,
),
)
)
# * Grant WRITE to user.
sql_ok(i1.sql(f""" grant write on table {table_name} to {username} """))
acl = i1.sql(f""" grant write on table {table_name} to {username} """)
assert acl["row_count"] == 1
# * WRITE succeeds.
i1.sql(
f""" insert into {table_name} values (1) """, user=username, password=password
)
i1.sql(f""" delete from {table_name} where "a" = 1 """)
# * Revoke WRITE from role.
sql_ok(i1.sql(f""" revoke write on table {table_name} from {username} """))
acl = i1.sql(f""" revoke write on table {table_name} from {username} """)
assert acl["row_count"] == 1
# * WRITE fails again.
raises(
with pytest.raises(
ReturnError,
rf"Write access to space '{table_name}' is denied for user '{username}'",
lambda: i1.sql(
match=rf"Write access to space '{table_name}' is denied for user '{username}'",
):
i1.sql(
f""" insert into {table_name} values (1) """,
user=username,
password=password,
),
)
)
# TODO: remove
sql_ok(i1.sql(f""" revoke read on table {table_name} from {username} """))
acl = i1.sql(f""" revoke read on table {table_name} from {username} """)
assert acl["row_count"] == 1
# ------------------CREATE---------------------------------
# * TODO: Unable to create table.
# * Grant CREATE to user.
sql_ok(i1.sql(f""" grant create table to {username} """))
acl = i1.sql(f""" grant create table to {username} """)
assert acl["row_count"] == 1
# * TODO: Creation is available.
# * TODO: Check user can do everything he wants on a table he created:
# ** READ.
......@@ -1155,29 +1155,38 @@ def test_sql_acl_privileges(cluster: Cluster):
# ** ALTER index.
# ** DROP.
# * Revoke CREATE from user.
sql_ok(i1.sql(f""" revoke create table from {username} """))
acl = i1.sql(f""" revoke create table from {username} """)
assert acl["row_count"] == 1
# * TODO: Creation is not available again.
# ------------------ALTER--------------------------------
# * TODO: Unable to create new table index.
# * Grant ALTER to user.
sql_ok(i1.sql(f""" grant alter on table {table_name} to {username} """))
acl = i1.sql(f""" grant alter on table {table_name} to {username} """)
assert acl["row_count"] == 1
# * TODO: Index creation succeeds.
# * Revoke ALTER from user.
sql_ok(i1.sql(f""" revoke alter on table {table_name} from {username} """))
acl = i1.sql(f""" revoke alter on table {table_name} from {username} """)
assert acl["row_count"] == 1
# * TODO: Attempt to remove index fails.
# ------------------DROP---------------------------------
# * TODO: Unable to drop table previously created by admin.
# * Grant DROP to user.
sql_ok(i1.sql(f""" grant drop on table {table_name} to {username} """))
acl = i1.sql(f""" grant drop on table {table_name} to {username} """)
assert acl["row_count"] == 1
# * TODO: Able to drop admin table.
# * Revoke DROP from user.
sql_ok(i1.sql(f""" revoke drop on table {table_name} from {username} """))
acl = i1.sql(f""" revoke drop on table {table_name} from {username} """)
assert acl["row_count"] == 1
# Grant global tables READ, WRITE, ALTER, DROP.
sql_ok(i1.sql(f""" grant read table to {username} """))
sql_ok(i1.sql(f""" grant write table to {username} """))
sql_ok(i1.sql(f""" grant alter table to {username} """))
sql_ok(i1.sql(f""" grant drop table to {username} """))
acl = i1.sql(f""" grant read table to {username} """)
assert acl["row_count"] == 1
acl = i1.sql(f""" grant write table to {username} """)
assert acl["row_count"] == 1
acl = i1.sql(f""" grant alter table to {username} """)
assert acl["row_count"] == 1
acl = i1.sql(f""" grant drop table to {username} """)
assert acl["row_count"] == 1
# Check all operations available on another_table created by admin.
i1.sql(
f""" select * from {another_table_name} """, user=username, password=password
......@@ -1194,34 +1203,38 @@ def test_sql_acl_privileges(cluster: Cluster):
"""
)
i1.sql(f""" delete from {another_table_name} """, user=username, password=password)
sql_ok(
i1.sql(
f""" drop table {another_table_name} """, user=username, password=password
)
ddl = i1.sql(
f""" drop table {another_table_name} """, user=username, password=password
)
assert ddl["row_count"] == 1
# Revoke global privileges
sql_ok(i1.sql(f""" revoke read table from {username} """))
sql_ok(i1.sql(f""" revoke write table from {username} """))
sql_ok(i1.sql(f""" revoke alter table from {username} """))
sql_ok(i1.sql(f""" revoke drop table from {username} """))
acl = i1.sql(f""" revoke read table from {username} """)
assert acl["row_count"] == 1
acl = i1.sql(f""" revoke write table from {username} """)
assert acl["row_count"] == 1
acl = i1.sql(f""" revoke alter table from {username} """)
assert acl["row_count"] == 1
acl = i1.sql(f""" revoke drop table from {username} """)
assert acl["row_count"] == 1
# ================ROLE passing================
# * Check there are no privileges granted to anything initially.
privs_rows = i1.sql(""" select * from "_pico_privilege" """)["rows"]
assert len(privs_rows) == 1 # universe
# * Read from table is prohibited for user initially.
raises(
with pytest.raises(
ReturnError,
rf"Read access to space '{table_name}' is denied for user '{username}'",
lambda: i1.sql(
f""" select * from {table_name} """, user=username, password=password
),
)
match=rf"Read access to space '{table_name}' is denied for user '{username}'",
):
i1.sql(f""" select * from {table_name} """, user=username, password=password)
# * Grant table READ and WRITE to role.
sql_ok(i1.sql(f""" grant read on table {table_name} to {rolename} """))
sql_ok(i1.sql(f""" grant write on table {table_name} to {rolename} """))
acl = i1.sql(f""" grant read on table {table_name} to {rolename} """)
assert acl["row_count"] == 1
acl = i1.sql(f""" grant write on table {table_name} to {rolename} """)
assert acl["row_count"] == 1
# * Grant ROLE to user.
sql_ok(i1.sql(f""" grant {rolename} to {username} """))
acl = i1.sql(f""" grant {rolename} to {username} """)
assert acl["row_count"] == 1
# * Check read and write is available for user.
i1.sql(f""" select * from {table_name} """, user=username, password=password)
i1.sql(
......@@ -1229,19 +1242,19 @@ def test_sql_acl_privileges(cluster: Cluster):
)
i1.sql(f""" delete from {table_name} where "a" = 1 """)
# * Revoke privileges from role.
sql_ok(i1.sql(f""" revoke write on table {table_name} from {rolename} """))
sql_ok(i1.sql(f""" revoke read on table {table_name} from {rolename} """))
acl = i1.sql(f""" revoke write on table {table_name} from {rolename} """)
assert acl["row_count"] == 1
acl = i1.sql(f""" revoke read on table {table_name} from {rolename} """)
assert acl["row_count"] == 1
# * Check privilege revoked from role and user.
privs_rows = i1.sql(""" select * from "_pico_privilege" """)["rows"]
assert len(privs_rows) == 2 # universe, role for user
# * Check read is prohibited again.
raises(
with pytest.raises(
ReturnError,
rf"Read access to space '{table_name}' is denied for user '{username}'",
lambda: i1.sql(
f""" select * from {table_name} """, user=username, password=password
),
)
match=rf"Read access to space '{table_name}' is denied for user '{username}'",
):
i1.sql(f""" select * from {table_name} """, user=username, password=password)
def test_distributed_sql_via_set_language(cluster: Cluster):
......@@ -1296,113 +1309,132 @@ def test_sql_privileges(cluster: Cluster):
table_name = "t"
# Create a test table
sql_ok(
i1.sql(
f"""
ddl = i1.sql(
f"""
create table "{table_name}" ("a" int not null, "b" int, primary key ("a"))
using memtx
distributed by ("a")
option (timeout = 3)
"""
)
)
assert ddl["row_count"] == 1
username = "alice"
alice_pwd = "1234567890"
# Create user with execute on universe privilege
sql_ok(
i1.sql(
f"""
acl = i1.sql(
f"""
create user "{username}" with password '{alice_pwd}'
using chap-sha1 option (timeout = 3)
"""
)
)
assert acl["row_count"] == 1
i1.eval(f""" pico.grant_privilege("{username}", "execute", "universe") """)
# ------------------------
# Check SQL read privilege
# ------------------------
raises(
ReturnError,
f"AccessDenied: Read access to space '{table_name}'",
lambda: i1.sql(f""" select * from "{table_name}" """, user=username, password=alice_pwd),
)
with pytest.raises(
ReturnError, match=f"AccessDenied: Read access to space '{table_name}'"
):
i1.sql(f""" select * from "{table_name}" """, user=username, password=alice_pwd)
# Grant read privilege
i1.eval(f""" pico.grant_privilege("{username}", "read", "space", "{table_name}") """)
dql = i1.sql(f""" select * from "{table_name}" """, user=username, password=alice_pwd)
i1.eval(
f""" pico.grant_privilege("{username}", "read", "space", "{table_name}") """
)
dql = i1.sql(
f""" select * from "{table_name}" """, user=username, password=alice_pwd
)
assert dql["rows"] == []
# Revoke read privilege
i1.eval(f""" pico.revoke_privilege("{username}", "read", "space", "{table_name}") """)
i1.eval(
f""" pico.revoke_privilege("{username}", "read", "space", "{table_name}") """
)
# -------------------------
# Check SQL write privilege
# -------------------------
raises(
ReturnError,
f"AccessDenied: Write access to space '{table_name}'",
lambda: i1.sql(f""" insert into "{table_name}" values (1, 2) """, user=username, password=alice_pwd)
)
with pytest.raises(
ReturnError, match=f"AccessDenied: Write access to space '{table_name}'"
):
i1.sql(
f""" insert into "{table_name}" values (1, 2) """,
user=username,
password=alice_pwd,
)
# Grant write privilege
i1.eval(f""" pico.grant_privilege("{username}", "write", "space", "{table_name}") """)
sql_ok(i1.sql(
f""" insert into "{table_name}" values (1, 2) """, user=username, password=alice_pwd
))
i1.eval(
f""" pico.grant_privilege("{username}", "write", "space", "{table_name}") """
)
dml = i1.sql(
f""" insert into "{table_name}" values (1, 2) """,
user=username,
password=alice_pwd,
)
assert dml["row_count"] == 1
# Revoke write privilege
i1.eval(f""" pico.revoke_privilege("{username}", "write", "space", "{table_name}") """)
i1.eval(
f""" pico.revoke_privilege("{username}", "write", "space", "{table_name}") """
)
# -----------------------------------
# Check SQL write and read privileges
# -----------------------------------
raises(
ReturnError,
f"AccessDenied: Read access to space '{table_name}'",
lambda: i1.sql(
with pytest.raises(
ReturnError, match=f"AccessDenied: Read access to space '{table_name}'"
):
i1.sql(
f""" insert into "{table_name}" select "a" + 1, "b" from "{table_name}" """,
user=username,
password=alice_pwd,
)
)
raises(
ReturnError,
f"AccessDenied: Read access to space '{table_name}'",
lambda: i1.sql(f""" update "{table_name}" set "b" = 42 """, user=username, password=alice_pwd)
)
raises(
ReturnError,
f"AccessDenied: Read access to space '{table_name}'",
lambda: i1.sql(f""" delete from "{table_name}" """, user=username, password=alice_pwd)
)
with pytest.raises(
ReturnError, match=f"AccessDenied: Read access to space '{table_name}'"
):
i1.sql(
f""" update "{table_name}" set "b" = 42 """,
user=username,
password=alice_pwd,
)
with pytest.raises(
ReturnError, match=f"AccessDenied: Read access to space '{table_name}'"
):
i1.sql(f""" delete from "{table_name}" """, user=username, password=alice_pwd)
# Grant read privilege
i1.eval(f""" pico.grant_privilege("{username}", "read", "space", "{table_name}") """)
i1.eval(
f""" pico.grant_privilege("{username}", "read", "space", "{table_name}") """
)
raises(
ReturnError,
f"AccessDenied: Write access to space '{table_name}'",
lambda: i1.sql(
with pytest.raises(
ReturnError, match=f"AccessDenied: Write access to space '{table_name}'"
):
i1.sql(
f""" insert into "{table_name}" select "a" + 1, "b" from "{table_name}" """,
user=username,
password=alice_pwd,
)
)
raises(
ReturnError,
f"AccessDenied: Write access to space '{table_name}'",
lambda: i1.sql(f""" update "{table_name}" set "b" = 42 """, user=username, password=alice_pwd)
)
raises(
ReturnError,
f"AccessDenied: Write access to space '{table_name}'",
lambda: i1.sql(f""" delete from "{table_name}" """, user=username, password=alice_pwd)
)
with pytest.raises(
ReturnError, match=f"AccessDenied: Write access to space '{table_name}'"
):
i1.sql(
f""" update "{table_name}" set "b" = 42 """,
user=username,
password=alice_pwd,
)
with pytest.raises(
ReturnError, match=f"AccessDenied: Write access to space '{table_name}'"
):
i1.sql(f""" delete from "{table_name}" """, user=username, password=alice_pwd)
# Grant write privilege
i1.eval(f""" pico.grant_privilege("{username}", "write", "space", "{table_name}") """)
i1.eval(
f""" pico.grant_privilege("{username}", "write", "space", "{table_name}") """
)
dml = i1.sql(
f""" insert into "{table_name}" select "a" + 1, "b" from "{table_name}" """,
......@@ -1410,7 +1442,9 @@ def test_sql_privileges(cluster: Cluster):
password=alice_pwd,
)
assert dml["row_count"] == 1
dml = i1.sql(f""" update "{table_name}" set "b" = 42 """, user=username, password=alice_pwd)
dml = i1.sql(
f""" update "{table_name}" set "b" = 42 """, user=username, password=alice_pwd
)
assert dml["row_count"] == 2
dml = i1.sql(f""" delete from "{table_name}" """, user=username, password=alice_pwd)
assert dml["row_count"] == 2
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment