-
Егор Ивков authoredЕгор Ивков authored
access_control.rs 55.39 KiB
//! Access control
//!
//! This module implements access control checks according to picodata access control model.
//!
//! Currently objects we control access to include tables, users and roles. The model closely follows one from
//! vanilla tarantool with some deviations. We build on top of tarantool infrastructure, i e in vanilla tarantool
//! permissions are represented with bit masks for each pair of object and user (for more context see `user_def.h`).
//! So we do not reimplement it in picodata, instead we have two functions exported from vanilla tarantool named
//! `box_access_check_space` and `box_access_check_ddl`. The first one as the name suggests implements checks for
//! DML operations on spaces, the second one allows to check remaining permissions with exception of some edge cases
//! that are handled separately in `access_check_grant_revoke` below. `access_check_grant_revoke` closely follows
//! `priv_def_check` from `alter.cc`. In vanilla ddl permission checks are performed via triggers attached to system spaces.
//! for example `on_replace_dd_space` or `on_replace_dd_priv`. Checks implemented in this module are 1:1 mapping of those.
//!
//! Since picodata is a distributed database we perform clusterwide operations in a coordinated fashion using compare and swap
//! building block. So every clusterwide operation has to go through raft leader that takes care of safely distributing this
//! operation across all instances. Because of that we have a distinction, privileges that are related to operations that change
//! state of the cluster (e g via creating users, new tables, writing to global tables) are checked before going forward with CaS
//! operation. On the other size permissions that do not change state of the cluster (reading and writing to sharded tables)
//! are handled by our sql processing logic in `sql.rs`.
//!
//! Executing CaS operation involves coordination between nodes of the cluster, since cluster nodes authenticate with each other
//! without involving end user credentials we need to pass the user who initiated the request along with the request to raft
//! leader so it can perform the access check. In order for this to work properly node that initiates the request needs to access
//! certain system spaces (see [`crate::cas::compare_and_swap`] for details). Connected users do not have such a permission from vanilla
//! tarantool perspective so we bypass vanilla checks by using `box.session.su(ADMIN)`. Then on the raft leader box.session.su
//! is used again to switch to user provided in the request. This is needed because tarantool functions we use for access checks
//! make them based on effective user.
use std::{
collections::{HashMap, HashSet},
fmt::Display,
};
use tarantool::{
access_control::{
box_access_check_ddl, box_access_check_space, PrivType,
SchemaObjectType as TntSchemaObjectType,
},
error::BoxError,
session::{self, UserId},
space::{Space, SystemSpace},
tuple::Encode,
};
use crate::storage::ClusterwideTable;
use crate::traft::op::Dml;
use crate::{
schema::{
PrivilegeDef, PrivilegeType, SchemaObjectType as PicoSchemaObjectType, ADMIN_ID,
PICO_SERVICE_ID, PICO_SERVICE_USER_NAME,
},
storage::{make_routine_not_found, space_by_id, Clusterwide, ToEntryIter},
traft::{
self,
op::{self, Op},
},
};
tarantool::define_str_enum! {
pub enum UserMetadataKind {
User = "user",
Role = "role",
}
}
/// User metadata. Represents a tuple of a system `_user` space.
/// TODO move to tarantool-module along with some space specific methods from storage.rs
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct UserMetadata {
pub id: UserId,
pub owner_id: UserId,
pub name: String,
#[serde(rename = "type")]
pub ty: UserMetadataKind,
pub auth: HashMap<String, String>,
// Note: no way to see the real type of this thing.
// It is an undocumented enterprise edition only field.
// For msgpack encoding to match needs to be a sequence
pub auth_history: [(); 0],
pub last_modified: usize,
}
impl Encode for UserMetadata {}
fn make_no_such_user(name: &str) -> tarantool::error::Error {
tarantool::set_error!(
tarantool::error::TarantoolErrorCode::NoSuchUser,
"no user named {}",
name
);
tarantool::error::TarantoolError::last().into()
}
/// The function produces an error that has the same format as one that is generated
/// by vanilla tarantool. See original definition in errcode.h
fn make_access_denied(
access_name: impl Display,
object_type: PicoSchemaObjectType,
object_name: impl Display,
user_name: impl Display,
) -> tarantool::error::Error {
tarantool::set_error!(
tarantool::error::TarantoolErrorCode::AccessDenied,
"{} to {} '{}' is denied for user '{}'",
access_name,
object_type,
object_name,
user_name
);
return tarantool::error::TarantoolError::last().into();
}
pub fn user_by_id(id: UserId) -> tarantool::Result<UserMetadata> {
let sys_user = Space::from(SystemSpace::User).get(&(id,))?;
match sys_user {
Some(u) => u.decode(),
None => {
tarantool::set_error!(
tarantool::error::TarantoolErrorCode::NoSuchUser,
"no such user #{}",
id
);
return Err(tarantool::error::TarantoolError::last().into());
}
}
}
/// There are no cases when box_access_check_ddl is called several times
/// in a row so it is ok that we need to switch once to user who initiated the request
/// This wrapper is needed because usually before checking permissions we need to
/// retrieve some metadata about the object being tested for access and this requires
/// access to system spaces which original user is not required to have.
///
/// # Panicking
///
/// Note that not all combinations of parameters are valid.
/// For in depth description of cases when this function may panic see
/// [box_access_check_ddl(box_access_check_ddl) in tarantool module.
fn box_access_check_ddl_as_user(
object_name: &str,
object_id: u32,
owner_id: u32,
object_type: TntSchemaObjectType,
access: PrivType,
as_user: UserId,
) -> tarantool::Result<()> {
let _su = session::su(as_user)?;
box_access_check_ddl(object_name, object_id, owner_id, object_type, access)
}
fn access_check_dml(dml: &op::Dml, as_user: UserId) -> tarantool::Result<()> {
let _su = session::su(as_user)?;
box_access_check_space(dml.space(), PrivType::Write)
}
/// This function performs access control checks that are identical to ones performed in
/// vanilla tarantool in on_replace_dd_space, on_replace_dd_index and on_replace_dd_func respectively
fn access_check_ddl(ddl: &op::Ddl, as_user: UserId) -> tarantool::Result<()> {
match ddl {
op::Ddl::CreateTable {
id, name, owner, ..
} => {
assert_eq!(
*owner, as_user,
"when creating objects creator is the owner"
);
box_access_check_ddl_as_user(
name,
*id,
*owner,
TntSchemaObjectType::Space,
PrivType::Create,
as_user,
)
}
op::Ddl::DropTable { id, .. } => {
let space = space_by_id(*id)?;
let meta = space.meta()?;
box_access_check_ddl_as_user(
&space.meta()?.name,
*id,
meta.user_id,
TntSchemaObjectType::Space,
PrivType::Drop,
as_user,
)
}
op::Ddl::CreateIndex { space_id, .. } => {
let space = space_by_id(*space_id)?;
let meta = space.meta()?;
box_access_check_ddl_as_user(
&meta.name,
*space_id,
meta.user_id,
TntSchemaObjectType::Space,
PrivType::Create,
as_user,
)
}
op::Ddl::DropIndex { space_id, .. } => {
let space = space_by_id(*space_id)?;
let meta = space.meta()?;
box_access_check_ddl_as_user(
&meta.name,
*space_id,
meta.user_id,
TntSchemaObjectType::Space,
PrivType::Drop,
as_user,
)
}
op::Ddl::CreateProcedure {
id, name, owner, ..
} => box_access_check_ddl_as_user(
name,
*id,
*owner,
TntSchemaObjectType::Function,
PrivType::Create,
as_user,
),
op::Ddl::DropProcedure { id, .. } => {
let node = traft::node::global().expect("node must be already initialized");
let Some(routine) = node.storage.routines.by_id(*id)? else {
tarantool::set_error!(
tarantool::error::TarantoolErrorCode::NoSuchProc,
"no such procedure #{}",
id
);
return Err(tarantool::error::TarantoolError::last().into());
};
box_access_check_ddl_as_user(
&routine.name,
*id,
routine.owner,
TntSchemaObjectType::Function,
PrivType::Drop,
as_user,
)
}
op::Ddl::RenameProcedure {
routine_id,
old_name,
owner_id,
..
} => box_access_check_ddl_as_user(
old_name,
*routine_id,
*owner_id,
TntSchemaObjectType::Function,
PrivType::Alter,
as_user,
),
}
}
/// This is port of vanilla tarantool function box/user.cc::role_check.
fn detect_role_grant_cycles(
granted_role: &UserMetadata,
priv_def: &PrivilegeDef,
storage: &Clusterwide,
) -> tarantool::Result<()> {
let grantee_id = priv_def.grantee_id();
let grantee_name = {
let grantee = user_by_id(grantee_id)?;
grantee.name
};
let role_id_to_grantees_id = {
let mut hm = HashMap::new();
for privilege in storage.privileges.iter()? {
if privilege.object_type() == PicoSchemaObjectType::Role
&& privilege.privilege() == PrivilegeType::Execute
{
match privilege.object_id() {
Some(object_id) => {
hm.entry(object_id as i64)
.or_insert(Vec::new())
.push(privilege.grantee_id() as i64);
}
None => continue,
}
}
}
hm
};
let mut visited = HashSet::new();
visited.insert(grantee_id as i64);
let mut current_layer = visited.clone();
// It's BFS with start from grantee_id
while !current_layer.is_empty() {
let mut next_layer = HashSet::new();
for role_id in current_layer.iter() {
let parents = role_id_to_grantees_id.get(role_id);
if let Some(parents) = parents {
next_layer.extend(parents.iter());
}
}
visited.extend(next_layer.iter());
current_layer = next_layer;
}
if visited.contains(&(granted_role.id as i64)) {
let err = BoxError::new(
tarantool::error::TarantoolErrorCode::RoleLoop,
format!(
"Granting role {} to role {} would create a loop",
granted_role.name, grantee_name,
),
);
return Err(err.into());
}
Ok(())
}
fn access_check_grant_revoke(
storage: &Clusterwide,
priv_def: &PrivilegeDef,
grantor_id: UserId,
access: PrivType,
access_name: &str,
) -> tarantool::Result<()> {
// Note: this is vanilla tarantool user, not picodata one.
let grantor = user_by_id(grantor_id)?;
let object_id = match priv_def.object_id() {
None => {
// This is a wildcard grant on entire entity type i e grant on all spaces.
// In vanilla tarantool each object type has counterpart with entity suffix
// i e BOX_SC_ENTITY_SPACE. Since these variants are not stored we do
// not materialize them as valid SchemaObjectType variants and streamline this check.
if grantor_id != ADMIN_ID {
return Err(make_access_denied(
access_name,
priv_def.object_type(),
"",
grantor.name,
));
}
return Ok(());
}
Some(object_id) => object_id,
};
// Nobody should be able to revoke privileges from pico_service
// As it would break the cluster permanently.
if access == PrivType::Revoke && priv_def.grantee_id() == PICO_SERVICE_ID {
tarantool::set_error!(
tarantool::error::TarantoolErrorCode::AccessDenied,
"Revoke '{}' from '{}' is denied for all users",
priv_def.privilege(),
PICO_SERVICE_USER_NAME,
);
return Err(tarantool::error::TarantoolError::last().into());
}
match priv_def.object_type() {
PicoSchemaObjectType::Universe => {
if priv_def.privilege() != PrivilegeType::Login {
// This assumption is used in the following checks
crate::warn_or_panic!(
"Only Login privilege can be granted on Universe, got {}",
priv_def.privilege()
);
return Err(make_access_denied(
access_name,
PicoSchemaObjectType::Universe,
"Only Login privilege can be granted on Universe",
grantor.name,
));
}
// Following checks are assuming that it's a grant or revoke of Login privilege
let target_sys_user = user_by_id(priv_def.grantee_id())?;
if target_sys_user.ty != UserMetadataKind::User {
return Err(make_no_such_user(&target_sys_user.name));
}
// Only owner or admin can grant login on user
if target_sys_user.owner_id != grantor_id && grantor_id != ADMIN_ID {
tarantool::set_error!(
tarantool::error::TarantoolErrorCode::AccessDenied,
"{} Login from '{}' is denied for {}",
access_name,
target_sys_user.name,
grantor.name
);
return Err(tarantool::error::TarantoolError::last().into());
}
}
PicoSchemaObjectType::Table => {
let space = space_by_id(object_id)?;
let meta = space.meta()?;
assert_eq!(object_id, meta.id, "user metadata id mismatch");
// Only owner or admin can grant on space
if meta.user_id != grantor_id && grantor_id != ADMIN_ID {
return Err(make_access_denied(
access_name,
PicoSchemaObjectType::Table,
meta.name,
grantor.name,
));
}
return box_access_check_ddl_as_user(
&meta.name,
meta.id,
grantor_id,
TntSchemaObjectType::Space,
access,
grantor_id,
);
}
PicoSchemaObjectType::Role => {
let granted_role = user_by_id(object_id)?;
assert_eq!(object_id, granted_role.id, "user metadata id mismatch");
// Only the creator of the role or admin can grant or revoke it.
// Everyone can grant 'PUBLIC' role.
// Note that having a role means having execute privilege on it.
if granted_role.owner_id != grantor_id
&& grantor_id != ADMIN_ID
&& !(granted_role.name == "public"
&& priv_def.privilege() == PrivilegeType::Execute)
{
return Err(make_access_denied(
access_name,
PicoSchemaObjectType::Role,
granted_role.name,
grantor.name,
));
}
detect_role_grant_cycles(&granted_role, priv_def, storage)?;
return box_access_check_ddl_as_user(
&granted_role.name,
granted_role.id,
grantor_id,
TntSchemaObjectType::Role,
access,
grantor_id,
);
}
PicoSchemaObjectType::User => {
let target_sys_user = user_by_id(object_id)?;
if target_sys_user.ty != UserMetadataKind::User {
return Err(make_no_such_user(&target_sys_user.name));
}
// Only owner or admin can grant on user
if target_sys_user.owner_id != grantor_id && grantor_id != ADMIN_ID {
return Err(make_access_denied(
access_name,
PicoSchemaObjectType::User,
target_sys_user.name,
grantor.name,
));
}
return box_access_check_ddl_as_user(
&target_sys_user.name,
target_sys_user.id,
grantor_id,
TntSchemaObjectType::User,
access,
grantor_id,
);
}
PicoSchemaObjectType::Routine => {
let routine = storage
.routines
.by_id(object_id)?
.ok_or_else(|| make_routine_not_found(object_id))?;
// Only owner or admin can grant on routine.
if routine.owner != grantor_id && grantor_id != ADMIN_ID {
return Err(make_access_denied(
access_name,
PicoSchemaObjectType::Routine,
&routine.name,
grantor.name,
));
}
return box_access_check_ddl_as_user(
&routine.name,
routine.id,
grantor_id,
TntSchemaObjectType::Function,
access,
grantor_id,
);
}
}
Ok(())
}
fn access_check_acl(
storage: &Clusterwide,
acl: &op::Acl,
as_user: UserId,
) -> tarantool::Result<()> {
match acl {
op::Acl::CreateUser { user_def } => {
assert_eq!(
user_def.owner, as_user,
"when creating objects creator is the owner"
);
box_access_check_ddl_as_user(
&user_def.name,
user_def.id,
user_def.owner,
TntSchemaObjectType::User,
PrivType::Create,
as_user,
)
}
op::Acl::ChangeAuth { user_id, .. } | op::Acl::RenameUser { user_id, .. } => {
let sys_user = user_by_id(*user_id)?;
assert_eq!(sys_user.id, *user_id, "user metadata id mismatch");
box_access_check_ddl_as_user(
&sys_user.name,
sys_user.id,
sys_user.owner_id,
TntSchemaObjectType::User,
PrivType::Alter,
as_user,
)
}
op::Acl::DropUser { user_id, .. } => {
let sys_user = user_by_id(*user_id)?;
assert_eq!(sys_user.id, *user_id, "user metadata id mismatch");
box_access_check_ddl_as_user(
&sys_user.name,
sys_user.id,
sys_user.owner_id,
TntSchemaObjectType::User,
PrivType::Drop,
as_user,
)
}
op::Acl::CreateRole { role_def } => {
assert_eq!(
role_def.owner, as_user,
"when creating objects creator is the owner"
);
box_access_check_ddl_as_user(
&role_def.name,
role_def.id,
role_def.owner,
TntSchemaObjectType::Role,
PrivType::Create,
as_user,
)
}
op::Acl::DropRole { role_id, .. } => {
// In vanilla roles and users are stored in the same space
// so we can reuse the definition
let sys_user = user_by_id(*role_id)?;
assert_eq!(sys_user.id, *role_id, "user metadata id mismatch");
box_access_check_ddl_as_user(
&sys_user.name,
sys_user.id,
sys_user.owner_id,
TntSchemaObjectType::Role,
PrivType::Drop,
as_user,
)
}
op::Acl::GrantPrivilege { priv_def } => {
access_check_grant_revoke(storage, priv_def, as_user, PrivType::Grant, "Grant")
}
op::Acl::RevokePrivilege { priv_def, .. } => {
access_check_grant_revoke(storage, priv_def, as_user, PrivType::Revoke, "Revoke")
}
}
}
pub(super) fn access_check_op(
storage: &Clusterwide,
op: &op::Op,
as_user: UserId,
) -> tarantool::Result<()> {
match op {
Op::Nop => Ok(()),
Op::Dml(dml) => access_check_dml(dml, as_user),
Op::BatchDml { ops } => {
for op in ops {
access_check_dml(op, as_user)?;
}
Ok(())
}
Op::DdlPrepare { ddl, .. } => access_check_ddl(ddl, as_user),
Op::DdlCommit | Op::DdlAbort => {
if as_user != ADMIN_ID {
let sys_user = user_by_id(as_user)?;
return Err(make_access_denied(
"ddl",
PicoSchemaObjectType::Universe,
"",
sys_user.name,
));
}
Ok(())
}
Op::PluginEnable { .. }
| Op::PluginConfigUpdate { .. }
| Op::PluginDisable { .. }
| Op::PluginRemove { .. }
| Op::PluginUpdateTopology { .. } => {
// FIXME: currently access here is not checked explicitly, but check
// dml into system space _pico_property.
// Same behaviour also using for check access of update and remove plugin operations.
// This will be fixed later by adding special rights for plugin system.
access_check_dml(
&Dml::replace(ClusterwideTable::Property, &(), as_user).expect("infallible"),
as_user,
)
}
Op::Acl(acl) => access_check_acl(storage, acl, as_user),
}
}
mod tests {
use std::collections::HashMap;
use super::{access_check_acl, access_check_ddl, user_by_id};
use crate::{
access_control::{access_check_op, UserMetadataKind},
schema::{
Distribution, PrivilegeDef, PrivilegeType, SchemaObjectType, UserDef, ADMIN_ID,
UNIVERSE_ID,
},
storage::{
acl::{
global_create_role, global_grant_privilege, on_master_create_role,
on_master_create_user, on_master_grant_privilege, on_master_revoke_privilege,
},
Clusterwide,
},
traft::op::{Acl, Ddl, Dml, Op},
};
use tarantool::{
auth::{AuthData, AuthDef, AuthMethod},
session::{self, UserId},
space::{Space, SpaceCreateOptions, SpaceEngineType},
tuple::{Tuple, TupleBuffer},
};
static mut NEXT_USER_ID: u32 = 42;
fn next_user_id() -> u32 {
// SAFETY: tests are always run on tx thread sequentially
unsafe {
NEXT_USER_ID += 1;
NEXT_USER_ID
}
}
#[tarantool::test]
fn decode_user_metadata() {
let sys_user = user_by_id(ADMIN_ID).unwrap();
assert_eq!(sys_user.id, ADMIN_ID);
assert_eq!(sys_user.owner_id, ADMIN_ID);
assert_eq!(&sys_user.name, "admin");
assert_eq!(sys_user.ty, UserMetadataKind::User);
assert_eq!(sys_user.auth, HashMap::new());
assert_eq!(sys_user.auth_history, []);
assert_eq!(sys_user.last_modified, 0);
}
fn dummy_auth_def() -> AuthDef {
AuthDef::new(
AuthMethod::ChapSha1,
AuthData::new(&AuthMethod::ChapSha1, "", "").into_string(),
)
}
fn dummy_user_def(id: UserId, name: String, owner: Option<UserId>) -> UserDef {
UserDef {
id,
name,
schema_version: 0,
auth: Some(dummy_auth_def()),
owner: owner.unwrap_or_else(|| session::uid().unwrap()),
ty: UserMetadataKind::User,
}
}
#[track_caller]
fn make_user(name: &str, owner: Option<UserId>) -> u32 {
let id = next_user_id();
let user_def = dummy_user_def(id, name.to_owned(), owner);
on_master_create_user(&user_def, true).unwrap();
id
}
#[track_caller]
fn grant(
storage: &Clusterwide,
privilege: PrivilegeType,
object_type: SchemaObjectType,
object_id: i64,
grantee_id: UserId,
grantor_id: Option<UserId>,
) {
let priv_def = PrivilegeDef::new(
privilege,
object_type,
object_id,
grantee_id,
grantor_id.unwrap_or(session::uid().unwrap()),
0,
)
.expect("must be valid");
access_check_op(
storage,
&Op::Acl(Acl::GrantPrivilege {
priv_def: priv_def.clone(),
}),
priv_def.grantor_id(),
)
.unwrap();
on_master_grant_privilege(&priv_def).unwrap();
}
#[track_caller]
fn revoke(
storage: &Clusterwide,
grantee_id: UserId,
privilege: PrivilegeType,
object_type: SchemaObjectType,
object_id: i64,
) {
let priv_def = PrivilegeDef::new(
privilege,
object_type,
object_id,
grantee_id,
session::uid().unwrap(),
0,
)
.expect("must be valid");
access_check_op(
storage,
&Op::Acl(Acl::RevokePrivilege {
priv_def: priv_def.clone(),
initiator: session::uid().unwrap(),
}),
priv_def.grantor_id(),
)
.unwrap();
on_master_revoke_privilege(&priv_def).unwrap()
}
#[tarantool::test]
fn validate_access_check_ddl() {
let user_name = "box_access_check_space_test_user";
let user_id = make_user(user_name, None);
let storage = Clusterwide::for_tests();
// space
let space_name = "test_box_access_check_ddl";
let space = Space::create(space_name, &SpaceCreateOptions::default()).unwrap();
// create space
{
let space_to_be_created = Ddl::CreateTable {
id: 42,
name: String::from("space_to_be_created"),
format: vec![],
primary_key: vec![],
distribution: Distribution::Global,
engine: SpaceEngineType::Blackhole,
owner: user_id,
};
let e = access_check_ddl(&space_to_be_created, user_id).unwrap_err();
assert_eq!(
e.to_string(),
format!("box error: AccessDenied: Create access to space 'space_to_be_created' is denied for user '{user_name}'"),
);
grant(
&storage,
PrivilegeType::Create,
SchemaObjectType::Table,
-1,
user_id,
None,
);
access_check_ddl(&space_to_be_created, user_id).unwrap();
}
// drop can be granted with wildcard, check on particular entity works
{
let e = access_check_ddl(
&Ddl::DropTable {
id: space.id(),
initiator: user_id,
},
user_id,
)
.unwrap_err();
assert_eq!(
e.to_string(),
format!("box error: AccessDenied: Drop access to space '{space_name}' is denied for user '{user_name}'"),
);
grant(
&storage,
PrivilegeType::Drop,
SchemaObjectType::Table,
-1,
user_id,
None,
);
access_check_ddl(
&Ddl::DropTable {
id: space.id(),
initiator: user_id,
},
user_id,
)
.unwrap();
}
revoke(
&storage,
user_id,
PrivilegeType::Drop,
SchemaObjectType::Table,
-1,
);
// drop on particular entity works
{
let e = access_check_ddl(
&Ddl::DropTable {
id: space.id(),
initiator: user_id,
},
user_id,
)
.unwrap_err();
assert_eq!(
e.to_string(),
format!("box error: AccessDenied: Drop access to space '{space_name}' is denied for user '{user_name}'"),
);
grant(
&storage,
PrivilegeType::Drop,
SchemaObjectType::Table,
space.id() as i64,
user_id,
None,
);
access_check_ddl(
&Ddl::DropTable {
id: space.id(),
initiator: user_id,
},
user_id,
)
.unwrap();
}
// owner has privileges on the object
// owner can grant permissions on the object to other users
{
let grantee_user_name = format!("{user_name}_grantee");
let grantee_user_id = make_user(&grantee_user_name, None);
let space_name_grant = format!("{space_name}_grant");
let space_opts = SpaceCreateOptions {
user: Some(user_name.into()),
..Default::default()
};
let space_grant = Space::create(&space_name_grant, &space_opts).unwrap();
let drop_op = |initiator| Op::DdlPrepare {
schema_version: 0,
ddl: Ddl::DropTable {
id: space_grant.id(),
initiator,
},
};
let write_op = |initiator| {
Op::Dml(Dml::Insert {
table: space_grant.id(),
tuple: TupleBuffer::from(Tuple::new(&(1,)).unwrap()),
initiator,
})
};
// owner himself has permission on an object
for op in [drop_op(user_id), write_op(user_id)] {
access_check_op(&storage, &op, user_id).unwrap();
}
// owner can grant permissions to another user
for (privilege, privilege_name, op) in [
(PrivilegeType::Drop, "Drop", drop_op(grantee_user_id)),
(PrivilegeType::Write, "Write", write_op(grantee_user_id)),
] {
// run access check for another user, it fails without grant
let e = access_check_op(&storage, &op, grantee_user_id).unwrap_err();
assert_eq!(
e.to_string(),
format!("box error: AccessDenied: {privilege_name} access to space '{space_name_grant}' is denied for user '{grantee_user_name}'"),
);
// grant permission on behalf of the user owning the space
grant(
&storage,
privilege,
SchemaObjectType::Table,
space_grant.id() as _,
grantee_user_id,
Some(user_id),
);
// access check should succeed
access_check_op(&storage, &op, grantee_user_id).unwrap();
}
}
}
#[tarantool::test]
fn validate_access_check_acl_user() {
// create works with passed id
let actor_user_name = "box_access_check_ddl_test_user_actor";
let user_under_test_name = "box_access_check_ddl_test_user";
let storage = Clusterwide::for_tests();
let actor_user_id = make_user(actor_user_name, None);
let user_under_test_id = make_user(user_under_test_name, None);
// create works with passed id
{
let e = access_check_acl(
&storage,
&Acl::CreateUser {
user_def: dummy_user_def(
123,
String::from("user_to_be_created"),
Some(actor_user_id),
),
},
actor_user_id,
)
.unwrap_err();
assert_eq!(
e.to_string(),
format!("box error: AccessDenied: Create access to user 'user_to_be_created' is denied for user '{actor_user_name}'"),
);
grant(
&storage,
PrivilegeType::Create,
SchemaObjectType::User,
-1,
actor_user_id,
None,
);
access_check_acl(
&storage,
&Acl::CreateUser {
user_def: dummy_user_def(
123,
String::from("user_to_be_created"),
Some(actor_user_id),
),
},
actor_user_id,
)
.unwrap();
}
// drop can be granted with wildcard, check on particular entity works
{
let e = access_check_acl(
&storage,
&Acl::DropUser {
user_id: user_under_test_id,
initiator: actor_user_id,
schema_version: 0,
},
actor_user_id,
)
.unwrap_err();
assert_eq!(
e.to_string(),
format!("box error: AccessDenied: Drop access to user '{user_under_test_name}' is denied for user '{actor_user_name}'"),
);
grant(
&storage,
PrivilegeType::Drop,
SchemaObjectType::User,
-1,
actor_user_id,
None,
);
access_check_acl(
&storage,
&Acl::DropUser {
user_id: user_under_test_id,
initiator: actor_user_id,
schema_version: 0,
},
actor_user_id,
)
.unwrap();
}
// alter can be granted with wildcard, check on particular entity works
{
let e = access_check_acl(
&storage,
&Acl::ChangeAuth {
user_id: user_under_test_id,
auth: dummy_auth_def(),
initiator: actor_user_id,
schema_version: 0,
},
actor_user_id,
)
.unwrap_err();
assert_eq!(
e.to_string(),
format!("box error: AccessDenied: Alter access to user '{user_under_test_name}' is denied for user '{actor_user_name}'"),
);
grant(
&storage,
PrivilegeType::Alter,
SchemaObjectType::User,
-1,
actor_user_id,
None,
);
access_check_acl(
&storage,
&Acl::ChangeAuth {
user_id: user_under_test_id,
auth: dummy_auth_def(),
initiator: actor_user_id,
schema_version: 0,
},
actor_user_id,
)
.unwrap();
}
revoke(
&storage,
actor_user_id,
PrivilegeType::Drop,
SchemaObjectType::User,
-1,
);
revoke(
&storage,
actor_user_id,
PrivilegeType::Alter,
SchemaObjectType::User,
-1,
);
// drop on particular entity works
{
let e = access_check_acl(
&storage,
&Acl::DropUser {
user_id: user_under_test_id,
initiator: actor_user_id,
schema_version: 0,
},
actor_user_id,
)
.unwrap_err();
assert_eq!(
e.to_string(),
format!("box error: AccessDenied: Drop access to user '{user_under_test_name}' is denied for user '{actor_user_name}'"),
);
grant(
&storage,
PrivilegeType::Drop,
SchemaObjectType::User,
user_under_test_id as i64,
actor_user_id,
None,
);
access_check_acl(
&storage,
&Acl::DropUser {
user_id: user_under_test_id,
initiator: actor_user_id,
schema_version: 0,
},
actor_user_id,
)
.unwrap();
}
// alter on particular entity works
{
let e = access_check_acl(
&storage,
&Acl::ChangeAuth {
user_id: user_under_test_id,
auth: dummy_auth_def(),
initiator: actor_user_id,
schema_version: 0,
},
actor_user_id,
)
.unwrap_err();
assert_eq!(
e.to_string(),
format!("box error: AccessDenied: Alter access to user '{user_under_test_name}' is denied for user '{actor_user_name}'"),
);
grant(
&storage,
PrivilegeType::Alter,
SchemaObjectType::User,
user_under_test_id as i64,
actor_user_id,
None,
);
access_check_acl(
&storage,
&Acl::ChangeAuth {
user_id: user_under_test_id,
auth: dummy_auth_def(),
initiator: actor_user_id,
schema_version: 0,
},
actor_user_id,
)
.unwrap();
}
// rename user
{
revoke(
&storage,
actor_user_id,
PrivilegeType::Alter,
SchemaObjectType::User,
user_under_test_id.into(),
);
// actor_user doesn't have alter to user_under_test
let e = access_check_acl(
&storage,
&Acl::RenameUser {
user_id: user_under_test_id,
name: "new_name".into(),
initiator: actor_user_id,
schema_version: 0,
},
actor_user_id,
)
.unwrap_err();
assert_eq!(
e.to_string(),
format!("box error: AccessDenied: Alter access to user '{user_under_test_name}' is denied for user '{actor_user_name}'"),
);
grant(
&storage,
PrivilegeType::Alter,
SchemaObjectType::User,
user_under_test_id as i64,
actor_user_id,
None,
);
// after grant actor_user have alter to user_under_test
access_check_acl(
&storage,
&Acl::RenameUser {
user_id: user_under_test_id,
name: "new_name".into(),
initiator: actor_user_id,
schema_version: 0,
},
actor_user_id,
)
.unwrap();
// by default user_under_test has alter on himself
access_check_acl(
&storage,
&Acl::RenameUser {
user_id: user_under_test_id,
name: "new_name".into(),
initiator: actor_user_id,
schema_version: 0,
},
user_under_test_id,
)
.unwrap();
revoke(
&storage,
user_under_test_id,
PrivilegeType::Alter,
SchemaObjectType::User,
user_under_test_id.into(),
);
access_check_acl(
&storage,
&Acl::RenameUser {
user_id: user_under_test_id,
name: "new_name".into(),
initiator: actor_user_id,
schema_version: 0,
},
user_under_test_id,
)
.unwrap_err();
// admin always can rename user
access_check_acl(
&storage,
&Acl::RenameUser {
user_id: user_under_test_id,
name: "new_name".into(),
initiator: actor_user_id,
schema_version: 0,
},
ADMIN_ID,
)
.unwrap();
}
// owner has privileges on the object
// owner can grant permissions on the object to other users
{
let grantee_user_name = format!("{actor_user_name}_grantee");
let grantee_user_id = make_user(&grantee_user_name, None);
let user_name_grant = format!("{actor_user_name}_grant");
let user_id_grant = make_user(&user_name_grant, Some(actor_user_id));
let drop_op = |initiator| {
Op::Acl(Acl::DropUser {
user_id: user_id_grant,
initiator,
schema_version: 0,
})
};
let alter_op = |initiator| {
Op::Acl(Acl::ChangeAuth {
user_id: user_id_grant,
auth: dummy_auth_def(),
initiator,
schema_version: 0,
})
};
// owner himself has permission on an object
for op in [drop_op(actor_user_id), alter_op(actor_user_id)] {
access_check_op(&storage, &op, actor_user_id).unwrap();
}
// owner can grant it to another user
for (privilege, privilege_name, op) in [
(PrivilegeType::Drop, "Drop", drop_op(grantee_user_id)),
(PrivilegeType::Alter, "Alter", alter_op(grantee_user_id)),
] {
// run access check for another user, it fails without grant
let e = access_check_op(&storage, &op, grantee_user_id).unwrap_err();
assert_eq!(
e.to_string(),
format!("box error: AccessDenied: {privilege_name} access to user '{user_name_grant}' is denied for user '{grantee_user_name}'"),
);
// grant permission on behalf of the user owning the user
grant(
&storage,
privilege,
SchemaObjectType::User,
user_id_grant as _,
grantee_user_id,
Some(actor_user_id),
);
// access check should succeed
access_check_op(&storage, &op, grantee_user_id).unwrap();
}
}
}
#[tarantool::test]
fn validate_access_check_acl_role() {
let user_name = "box_access_check_ddl_test_role";
let user_id = make_user(user_name, None);
let storage = Clusterwide::for_tests();
let role_name = "box_access_check_ddl_test_role_some_role";
let role_def = UserDef {
id: next_user_id(),
name: String::from(role_name),
schema_version: 0,
owner: ADMIN_ID,
auth: None,
ty: UserMetadataKind::Role,
};
on_master_create_role(&role_def).expect("create role shouldnt fail");
// create works with passed id
{
let role_to_be_created = UserDef {
id: 123,
name: String::from("role_to_be_created"),
schema_version: 0,
owner: user_id,
auth: None,
ty: UserMetadataKind::Role,
};
let e = access_check_acl(
&storage,
&Acl::CreateRole {
role_def: role_to_be_created.clone(),
},
user_id,
)
.unwrap_err();
assert_eq!(
e.to_string(),
format!("box error: AccessDenied: Create access to role 'role_to_be_created' is denied for user '{user_name}'"),
);
grant(
&storage,
PrivilegeType::Create,
SchemaObjectType::Role,
-1,
user_id,
None,
);
access_check_acl(
&storage,
&Acl::CreateRole {
role_def: role_to_be_created,
},
user_id,
)
.unwrap();
}
// drop can be granted with wildcard, check on particular entity works
{
let e = access_check_acl(
&storage,
&Acl::DropRole {
role_id: role_def.id,
initiator: user_id,
schema_version: 0,
},
user_id,
)
.unwrap_err();
assert_eq!(
e.to_string(),
format!("box error: AccessDenied: Drop access to role '{role_name}' is denied for user '{user_name}'"),
);
grant(
&storage,
PrivilegeType::Drop,
SchemaObjectType::Role,
-1,
user_id,
None,
);
access_check_acl(
&storage,
&Acl::DropRole {
role_id: role_def.id,
initiator: user_id,
schema_version: 0,
},
user_id,
)
.unwrap();
}
revoke(
&storage,
user_id,
PrivilegeType::Drop,
SchemaObjectType::Role,
-1,
);
// drop on particular entity works
{
let e = access_check_acl(
&storage,
&Acl::DropRole {
role_id: role_def.id,
initiator: user_id,
schema_version: 0,
},
user_id,
)
.unwrap_err();
assert_eq!(
e.to_string(),
format!("box error: AccessDenied: Drop access to role '{role_name}' is denied for user '{user_name}'"),
);
grant(
&storage,
PrivilegeType::Drop,
SchemaObjectType::Role,
role_def.id as i64,
user_id,
None,
);
access_check_acl(
&storage,
&Acl::DropRole {
role_id: role_def.id,
initiator: user_id,
schema_version: 0,
},
user_id,
)
.unwrap();
}
// owner has privileges on the object
// owner can grant permissions on the object to other users
{
let grantee_user_name = format!("{user_name}_grantee");
let grantee_user_id = make_user(&grantee_user_name, None);
let role_name_grant = format!("{role_name}_grant");
let role_id_grant = next_user_id();
let role_def = UserDef {
id: role_id_grant,
name: role_name_grant.clone(),
schema_version: 0,
owner: user_id,
auth: None,
ty: UserMetadataKind::Role,
};
on_master_create_role(&role_def).expect("create role shouldn't fail");
let op = |initiator| {
Op::Acl(Acl::DropRole {
role_id: role_id_grant,
initiator,
schema_version: 0,
})
};
// owner himself has permission on an object
access_check_op(&storage, &op(user_id), user_id).unwrap();
// run access check for another user, it fails without grant
let e = access_check_op(&storage, &op(grantee_user_id), grantee_user_id).unwrap_err();
assert_eq!(
e.to_string(),
format!("box error: AccessDenied: Drop access to role '{role_name_grant}' is denied for user '{grantee_user_name}'"),
);
// grant permission on behalf of the user owning the role
grant(
&storage,
PrivilegeType::Drop,
SchemaObjectType::Role,
role_id_grant as _,
grantee_user_id,
Some(user_id),
);
// access check should succeed
access_check_op(&storage, &op(grantee_user_id), grantee_user_id).unwrap();
}
}
#[tarantool::test]
fn prohibit_circular_role_grant() {
let storage = Clusterwide::for_tests();
let create_role = |name| {
let id = next_user_id();
let role_def = UserDef {
id,
name: String::from(name),
schema_version: 0,
owner: ADMIN_ID,
auth: None,
ty: UserMetadataKind::Role,
};
on_master_create_role(&role_def).expect("create role shouldn't fail");
global_create_role(&storage, &role_def).expect("create role shouldn't fail");
id
};
let parent_id = create_role("Parent");
let child_id = create_role("Child");
// circular grant: child to parent, parent to child should throw error
{
grant(
&storage,
PrivilegeType::Execute,
SchemaObjectType::Role,
child_id as i64,
parent_id,
None,
);
// grant child to parent
let privilege = PrivilegeDef::new(
PrivilegeType::Execute,
SchemaObjectType::Role,
child_id as i64,
parent_id,
ADMIN_ID,
0,
)
.unwrap();
global_grant_privilege(&storage, &privilege).unwrap();
// grant parent to child
let privilege = PrivilegeDef::new(
PrivilegeType::Execute,
SchemaObjectType::Role,
parent_id as i64,
child_id,
ADMIN_ID,
0,
)
.unwrap();
let e = access_check_acl(
&storage,
&Acl::GrantPrivilege {
priv_def: privilege,
},
ADMIN_ID,
)
.unwrap_err();
assert_eq!(
e.to_string(),
format!(
"box error: RoleLoop: Granting role Parent to role Child would create a loop"
),
);
}
}
#[tarantool::test]
fn alter_user_login() {
let storage = Clusterwide::for_tests();
let owner_name = "test_owner";
let user_name = "test_user";
let owner_id = make_user(owner_name, None);
let user_id = make_user(user_name, Some(owner_id));
// User that is not an owner or admin cannot grant/revoke login on user
let e = access_check_acl(
&storage,
&Acl::GrantPrivilege {
priv_def: PrivilegeDef::new(
PrivilegeType::Login,
SchemaObjectType::Universe,
UNIVERSE_ID,
user_id,
user_id,
0,
)
.unwrap(),
},
user_id,
)
.unwrap_err();
assert_eq!(
e.to_string(),
format!(
"box error: AccessDenied: Grant Login from '{user_name}' is denied for test_user"
)
);
let e = access_check_acl(
&storage,
&Acl::RevokePrivilege {
priv_def: PrivilegeDef::new(
PrivilegeType::Login,
SchemaObjectType::Universe,
UNIVERSE_ID,
user_id,
user_id,
0,
)
.unwrap(),
initiator: user_id,
},
user_id,
)
.unwrap_err();
assert_eq!(
e.to_string(),
format!(
"box error: AccessDenied: Revoke Login from '{user_name}' is denied for test_user"
)
);
// Owner can grant/revoke login on user
access_check_acl(
&storage,
&Acl::GrantPrivilege {
priv_def: PrivilegeDef::new(
PrivilegeType::Login,
SchemaObjectType::Universe,
UNIVERSE_ID,
user_id,
owner_id,
0,
)
.unwrap(),
},
owner_id,
)
.unwrap();
access_check_acl(
&storage,
&Acl::RevokePrivilege {
priv_def: PrivilegeDef::new(
PrivilegeType::Login,
SchemaObjectType::Universe,
UNIVERSE_ID,
user_id,
owner_id,
0,
)
.unwrap(),
initiator: owner_id,
},
owner_id,
)
.unwrap();
// Admin can grant/revoke login on user
access_check_acl(
&storage,
&Acl::GrantPrivilege {
priv_def: PrivilegeDef::new(
PrivilegeType::Login,
SchemaObjectType::Universe,
UNIVERSE_ID,
user_id,
ADMIN_ID,
0,
)
.unwrap(),
},
ADMIN_ID,
)
.unwrap();
access_check_acl(
&storage,
&Acl::RevokePrivilege {
priv_def: PrivilegeDef::new(
PrivilegeType::Login,
SchemaObjectType::Universe,
UNIVERSE_ID,
user_id,
ADMIN_ID,
0,
)
.unwrap(),
initiator: ADMIN_ID,
},
ADMIN_ID,
)
.unwrap();
}
}