diff --git a/test/int/test_audit.py b/test/int/test_audit.py index fd51283f8fc8e442d6ecc5c72b3854114caecbe0..716d580243eb91b3e28723307a3c7b333a9f4804 100644 --- a/test/int/test_audit.py +++ b/test/int/test_audit.py @@ -176,11 +176,11 @@ class EventGrantPrivilege(Event): class EventRevokePrivilege(Event): TITLE: ClassVar[str] = "revoke_privilege" privilege: str - object: str object_type: str grantee: str grantee_type: str initiator: str + object: Optional[str] = None @dataclass @@ -592,3 +592,202 @@ def test_access_denied(instance: Instance): assert access_denied.object_type == "role" assert access_denied.object_name == "R" assert access_denied.initiator == "ymir" + + +def test_grant_revoke(instance: Instance): + instance.start() + + user = "ymir" + password = "12341234" + + instance.create_user(with_name=user, with_password=password) + + instance.sudo_sql(f'GRANT CREATE ROLE TO "{user}"') + + audit = AuditFile(instance.audit_flag_value) + for _ in audit.events(): + pass + + events = audit.events() + + # wildcard privilege to user + instance.sudo_sql(f'GRANT CREATE TABLE TO "{user}"') + + grant_privilege = take_until_type(events, EventGrantPrivilege) + + assert grant_privilege is not None + assert grant_privilege.message == "granted privilege create on table to user `ymir`" + assert grant_privilege.severity == Severity.High + assert grant_privilege.privilege == "create" + assert grant_privilege.object_type == "table" + assert grant_privilege.grantee == user + assert grant_privilege.grantee_type == "user" + assert grant_privilege.initiator == "admin" + assert grant_privilege.object is None + + instance.sudo_sql(f'REVOKE CREATE TABLE FROM "{user}"') + + revoke_privilege = take_until_type(events, EventRevokePrivilege) + + assert revoke_privilege is not None + assert ( + revoke_privilege.message + == f"revoked privilege create on table from user `{user}`" + ) + assert revoke_privilege.severity == Severity.High + assert revoke_privilege.privilege == "create" + assert revoke_privilege.object_type == "table" + assert revoke_privilege.grantee == user + assert revoke_privilege.grantee_type == "user" + assert revoke_privilege.initiator == "admin" + assert revoke_privilege.object is None + + # specific privilege to user + instance.sudo_sql(f'GRANT READ ON TABLE "_pico_tier" TO "{user}"') + + grant_privilege = take_until_type(events, EventGrantPrivilege) + + assert grant_privilege is not None + assert ( + grant_privilege.message + == f"granted privilege read on table `_pico_tier` to user `{user}`" + ) + assert grant_privilege.severity == Severity.High + assert grant_privilege.privilege == "read" + assert grant_privilege.object_type == "table" + assert grant_privilege.grantee == user + assert grant_privilege.grantee_type == "user" + assert grant_privilege.initiator == "admin" + assert grant_privilege.object == "_pico_tier" + + instance.sudo_sql(f'REVOKE READ ON TABLE "_pico_tier" FROM "{user}"') + + revoke_privilege = take_until_type(events, EventRevokePrivilege) + + assert revoke_privilege is not None + assert ( + revoke_privilege.message + == f"revoked privilege read on table `_pico_tier` from user `{user}`" + ) + assert revoke_privilege.severity == Severity.High + assert revoke_privilege.privilege == "read" + assert revoke_privilege.object_type == "table" + assert revoke_privilege.grantee == user + assert revoke_privilege.grantee_type == "user" + assert revoke_privilege.initiator == "admin" + assert revoke_privilege.object == "_pico_tier" + + instance.sql('CREATE ROLE "R"', user=user, password=password) + + # wildcard privilege to role + instance.sudo_sql('GRANT CREATE TABLE TO "R"') + + grant_privilege = take_until_type(events, EventGrantPrivilege) + + assert grant_privilege is not None + assert grant_privilege.message == "granted privilege create on table to role `R`" + assert grant_privilege.severity == Severity.High + assert grant_privilege.privilege == "create" + assert grant_privilege.object_type == "table" + assert grant_privilege.grantee == "R" + assert grant_privilege.grantee_type == "role" + assert grant_privilege.initiator == "admin" + assert grant_privilege.object is None + + instance.sudo_sql('REVOKE CREATE TABLE FROM "R"') + + revoke_privilege = take_until_type(events, EventRevokePrivilege) + + assert revoke_privilege is not None + assert revoke_privilege.message == "revoked privilege create on table from role `R`" + assert revoke_privilege.severity == Severity.High + assert revoke_privilege.privilege == "create" + assert revoke_privilege.object_type == "table" + assert revoke_privilege.grantee == "R" + assert revoke_privilege.grantee_type == "role" + assert revoke_privilege.initiator == "admin" + assert revoke_privilege.object is None + + # specific privilege to role + instance.sudo_sql('GRANT READ ON TABLE "_pico_user" TO "R"') + + grant_privilege = take_until_type(events, EventGrantPrivilege) + + assert grant_privilege is not None + assert ( + grant_privilege.message + == "granted privilege read on table `_pico_user` to role `R`" + ) + assert grant_privilege.severity == Severity.High + assert grant_privilege.privilege == "read" + assert grant_privilege.object_type == "table" + assert grant_privilege.grantee == "R" + assert grant_privilege.grantee_type == "role" + assert grant_privilege.initiator == "admin" + assert grant_privilege.object == "_pico_user" + + instance.sudo_sql('REVOKE READ ON TABLE "_pico_user" FROM "R"') + + revoke_privilege = take_until_type(events, EventRevokePrivilege) + + assert revoke_privilege is not None + assert ( + revoke_privilege.message + == "revoked privilege read on table `_pico_user` from role `R`" + ) + assert revoke_privilege.severity == Severity.High + assert revoke_privilege.privilege == "read" + assert revoke_privilege.object_type == "table" + assert revoke_privilege.grantee == "R" + assert revoke_privilege.grantee_type == "role" + assert revoke_privilege.initiator == "admin" + assert revoke_privilege.object == "_pico_user" + + # role to user + instance.sql('GRANT "R" TO "ymir"', user=user, password=password) + + grant_role = take_until_type(events, EventGrantRole) + assert grant_role is not None + assert grant_role.message == "granted role `R` to user `ymir`" + assert grant_role.severity == Severity.High + assert grant_role.role == "R" + assert grant_role.grantee == "ymir" + assert grant_role.grantee_type == "user" + assert grant_role.initiator == "ymir" + + instance.sql(f'REVOKE "R" FROM "{user}"', user=user, password=password) + + revoke_role = take_until_type(events, EventRevokeRole) + + assert revoke_role is not None + assert revoke_role.message == f"revoked role `R` from user `{user}`" + assert revoke_role.severity == Severity.High + assert revoke_role.role == "R" + assert revoke_role.grantee == user + assert revoke_role.grantee_type == "user" + assert revoke_role.initiator == user + + # one role to another role + instance.sql('CREATE ROLE "R2"', user=user, password=password) + instance.sql('GRANT "R" TO "R2"', user=user, password=password) + + grant_role = take_until_type(events, EventGrantRole) + assert grant_role is not None + assert grant_role.message == "granted role `R` to role `R2`" + assert grant_role.severity == Severity.High + assert grant_role.role == "R" + assert grant_role.grantee == "R2" + assert grant_role.grantee_type == "role" + assert grant_role.initiator == "ymir" + + instance.sql('REVOKE "R" FROM "R2"', user=user, password=password) + + revoke_role = take_until_type(events, EventRevokeRole) + + assert revoke_role is not None + assert revoke_role.message == "revoked role `R` from role `R2`" + assert revoke_role.severity == Severity.High + assert revoke_role.role == "R" + assert revoke_role.grantee == "R2" + assert revoke_role.grantee_type == "role" + assert revoke_role.initiator == user