From a8ee0087781fd53e69978954ff8db6aee1ed86bd Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Tue, 6 Jun 2023 18:15:09 +0300 Subject: [PATCH] feat: acl create user --- src/cas.rs | 11 ++- src/schema.rs | 52 ++++++++++ src/storage.rs | 231 ++++++++++++++++++++++++++++++++++++++++++- src/traft/node.rs | 35 ++++++- src/traft/op.rs | 26 ++++- test/int/test_acl.py | 38 +++++++ 6 files changed, 381 insertions(+), 12 deletions(-) create mode 100644 test/int/test_acl.py diff --git a/src/cas.rs b/src/cas.rs index 19be462f3c..4c4f5cde47 100644 --- a/src/cas.rs +++ b/src/cas.rs @@ -26,8 +26,11 @@ use tarantool::tuple::{KeyDef, ToTupleBuffer, Tuple, TupleBuffer}; use once_cell::sync::Lazy; -const PROHIBITED_SPACES: &[ClusterwideSpaceId] = - &[ClusterwideSpaceId::Space, ClusterwideSpaceId::Index]; +const PROHIBITED_SPACES: &[ClusterwideSpaceId] = &[ + ClusterwideSpaceId::Space, + ClusterwideSpaceId::Index, + ClusterwideSpaceId::User, +]; /// Performs a clusterwide compare and swap operation. /// @@ -368,7 +371,7 @@ impl Predicate { return Err(error()); } } - Op::DdlPrepare { .. } | Op::DdlCommit | Op::DdlAbort => { + Op::DdlPrepare { .. } | Op::DdlCommit | Op::DdlAbort | Op::Acl { .. } => { let key_def = storage.key_def_for_key(space, 0)?; for key in ddl_keys.iter() { if range.contains(&key_def, key) { @@ -547,7 +550,7 @@ impl Bound { fn space(op: &Op) -> Option<SpaceId> { match op { Op::Dml(dml) => Some(dml.space()), - Op::DdlPrepare { .. } | Op::DdlCommit | Op::DdlAbort => { + Op::DdlPrepare { .. } | Op::DdlCommit | Op::DdlAbort | Op::Acl { .. } => { Some(ClusterwideSpaceId::Property.into()) } Op::Nop => None, diff --git a/src/schema.rs b/src/schema.rs index b53a07b67c..56fc4a264f 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -25,6 +25,10 @@ use crate::traft::op::{Ddl, DdlBuilder, Op}; use crate::traft::{self, event, node, RaftIndex}; use crate::util::instant_saturating_add; +//////////////////////////////////////////////////////////////////////////////// +// SpaceDef +//////////////////////////////////////////////////////////////////////////////// + /// Space definition. /// /// Describes a user-defined space. @@ -81,6 +85,10 @@ impl SpaceDef { } } +//////////////////////////////////////////////////////////////////////////////// +// Distribution +//////////////////////////////////////////////////////////////////////////////// + /// Defines how to distribute tuples in a space across replicasets. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, LuaRead)] #[serde(rename_all = "snake_case")] @@ -121,6 +129,10 @@ fn default_bucket_id_field() -> String { } } +//////////////////////////////////////////////////////////////////////////////// +// IndexDef +//////////////////////////////////////////////////////////////////////////////// + /// Index definition. /// /// Describes a user-defined index. @@ -160,6 +172,46 @@ impl IndexDef { } } +//////////////////////////////////////////////////////////////////////////////// +// UserDef +//////////////////////////////////////////////////////////////////////////////// + +/// User definition. +/// +/// Describes a user-defined index. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct UserDef { + pub id: UserId, + pub name: String, + pub schema_version: u64, + pub auth: AuthDef, +} + +pub type UserId = u32; + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct AuthDef { + pub method: AuthMethod, + /// Base64 encoded digest. + pub data: String, +} + +::tarantool::define_str_enum! { + pub enum AuthMethod { + ChapSha1 = "chap-sha1", + } +} + +impl Encode for UserDef {} + +impl UserDef { + // TODO +} + +//////////////////////////////////////////////////////////////////////////////// +// ... +//////////////////////////////////////////////////////////////////////////////// + // TODO: this should be a TryFrom in tarantool-module pub fn try_space_field_type_to_index_field_type( ft: tarantool::space::FieldType, diff --git a/src/storage.rs b/src/storage.rs index 19de71ba96..8e04f898d7 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -3,6 +3,7 @@ use ::tarantool::index::{Index, IndexId, IndexIterator, IteratorType}; use ::tarantool::msgpack::{ArrayWriter, ValueIter}; use ::tarantool::space::UpdateOps; use ::tarantool::space::{FieldType, Space, SpaceId, SystemSpace}; +use ::tarantool::tlua::{self, LuaError}; use ::tarantool::tuple::KeyDef; use ::tarantool::tuple::{Decode, DecodeOwned, Encode}; use ::tarantool::tuple::{RawBytes, ToTupleBuffer, Tuple, TupleBuffer}; @@ -10,7 +11,7 @@ use ::tarantool::tuple::{RawBytes, ToTupleBuffer, Tuple, TupleBuffer}; use crate::failure_domain as fd; use crate::instance::{self, grade, Instance}; use crate::replicaset::{Replicaset, ReplicasetId}; -use crate::schema::{Distribution, IndexDef, SpaceDef}; +use crate::schema::{Distribution, IndexDef, SpaceDef, UserDef, UserId}; use crate::tlog; use crate::traft; use crate::traft::error::Error; @@ -475,6 +476,21 @@ define_clusterwide_spaces! { #[allow(clippy::enum_variant_names)] pub enum SpaceIndexIndex; } + User = 520, "_pico_user" => { + Clusterwide::users; + + /// A struct for accessing info of all the user-defined users. + pub struct Users { + space: Space, + #[primary] + index_id: Index => Id = "id", + index_name: Index => Name = "name", + } + + /// An enumeration of indexes defined for "_pico_user". + #[allow(clippy::enum_variant_names)] + pub enum SpaceUserIndex; + } } /// An index of a clusterwide space. @@ -550,12 +566,17 @@ impl Clusterwide { pub fn apply_snapshot_data(&self, data: &SnapshotData, is_master: bool) -> Result<()> { debug_assert!(unsafe { ::tarantool::ffi::tarantool::box_txn() }); - // We need to save these before truncating _pico_space. + // These need to be saved before we truncate the corresponding space. let mut old_space_versions: HashMap<SpaceId, u64> = HashMap::new(); for space_def in self.spaces.iter()? { old_space_versions.insert(space_def.id, space_def.schema_version); } + let mut old_user_versions = HashMap::new(); + for user_def in self.users.iter()? { + old_user_versions.insert(user_def.id, user_def.schema_version); + } + let mut dont_exist_yet = Vec::new(); for space_dump in &data.space_dumps { let space_name = &space_dump.space_name; @@ -574,6 +595,7 @@ impl Clusterwide { // via tarantool replication. if is_master { self.apply_ddl_changes_on_master(&old_space_versions)?; + self.apply_acl_changes_on_master(&old_user_versions)?; set_local_schema_version(data.schema_version)?; } @@ -670,6 +692,52 @@ impl Clusterwide { Ok(()) } + pub fn apply_acl_changes_on_master( + &self, + old_user_versions: &HashMap<UserId, u64>, + ) -> traft::Result<()> { + let mut user_defs = Vec::new(); + let mut new_user_ids = HashSet::new(); + for user_def in self.users.iter()? { + new_user_ids.insert(user_def.id); + user_defs.push(user_def); + } + + // First we drop all users which have been dropped. + for &old_user_id in old_user_versions.keys() { + if new_user_ids.contains(&old_user_id) { + // Will be handled later. + continue; + } + + // User was dropped. + acl_drop_user_on_master(old_user_id)?; + } + + // Now create any new users, or replace ones that changed. + for user_def in &user_defs { + let user_id = user_def.id; + if let Some(&v_old) = old_user_versions.get(&user_id) { + let v_new = user_def.schema_version; + assert!(v_old <= v_new); + + if v_old == v_new { + // User def is up to date. + continue; + } + + // User def changed, need to drop it and recreate. + acl_drop_user_on_master(user_id)?; + } else { + // New user. + } + + acl_create_user_on_master(user_def)?; + } + + Ok(()) + } + /// Return a `KeyDef` to be used for comparing **tuples** of the /// corresponding global space. pub(crate) fn key_def( @@ -1845,6 +1913,154 @@ pub fn ddl_drop_space_on_master(space_id: SpaceId) -> traft::Result<Option<TntEr Ok(res.err()) } +//////////////////////////////////////////////////////////////////////////////// +// Users +//////////////////////////////////////////////////////////////////////////////// + +impl Users { + pub fn new() -> tarantool::Result<Self> { + let space = Space::builder(Self::SPACE_NAME) + .id(Self::SPACE_ID) + .is_local(true) + .is_temporary(false) + .field(("id", FieldType::Unsigned)) + .field(("name", FieldType::String)) + .field(("schema_version", FieldType::Unsigned)) + .field(("auth", FieldType::Array)) + .if_not_exists(true) + .create()?; + + let index_id = space + .index_builder(IndexOf::<Self>::Id.as_str()) + .unique(true) + .part("id") + .if_not_exists(true) + .create()?; + + let index_name = space + .index_builder(IndexOf::<Self>::Name.as_str()) + .unique(true) + .part("name") + .if_not_exists(true) + .create()?; + + Ok(Self { + space, + index_id, + index_name, + }) + } + + #[inline] + pub fn by_id(&self, user_id: UserId) -> tarantool::Result<Option<UserDef>> { + let tuple = self.space.get(&[user_id])?; + let mut res = None; + if let Some(tuple) = tuple { + res = Some(tuple.decode()?); + } + Ok(res) + } + + #[inline] + pub fn by_name(&self, user_name: &str) -> tarantool::Result<Option<UserDef>> { + let tuple = self.index_name.get(&[user_name])?; + let mut res = None; + if let Some(tuple) = tuple { + res = Some(tuple.decode()?); + } + Ok(res) + } + + #[inline] + pub fn replace(&self, user_def: &UserDef) -> tarantool::Result<()> { + self.space.replace(user_def)?; + Ok(()) + } + + #[inline] + pub fn insert(&self, user_def: &UserDef) -> tarantool::Result<()> { + self.space.insert(user_def)?; + Ok(()) + } +} + +impl ToEntryIter for Users { + type Entry = UserDef; + + #[inline(always)] + fn index_iter(&self) -> Result<IndexIterator> { + Ok(self.space.select(IteratorType::All, &())?) + } +} + +//////////////////////////////////////////////////////////////////////////////// +// acl global +//////////////////////////////////////////////////////////////////////////////// + +/// Persist a user definition in the internal clusterwide storage. +pub fn acl_global_create_user(storage: &Clusterwide, user_def: &UserDef) -> tarantool::Result<()> { + storage.users.insert(user_def)?; + Ok(()) +} + +//////////////////////////////////////////////////////////////////////////////// +// acl +//////////////////////////////////////////////////////////////////////////////// + +/// Create a tarantool user. Grant it default privileges. +pub fn acl_create_user_on_master(user_def: &UserDef) -> tarantool::Result<()> { + let sys_user = Space::from(SystemSpace::User); + + // This impelemtation was copied from box.schema.user.create excluding the + // password hashing. + let user_id = user_def.id; + let euid = ::tarantool::session::euid()?; + + // Tarantool expects auth info to be a map of form `{ method: data }`, + // and currently the simplest way to achieve this is to use a HashMap. + let auth_map = HashMap::from([(user_def.auth.method, &user_def.auth.data)]); + sys_user.insert(&( + user_id, + euid, + &user_def.name, + "user", + auth_map, + &[(); 0], + 0, + ))?; + + let lua = ::tarantool::lua_state(); + lua.exec_with("box.schema.user.grant(...)", (user_id, "public")) + .map_err(LuaError::from)?; + lua.exec_with( + "box.schema.user.grant(...)", + (user_id, "alter", "user", user_id), + ) + .map_err(LuaError::from)?; + lua.exec_with( + "box.session.su('admin', box.schema.user.grant, ...)", + ( + user_id, + "session,usage", + "universe", + tlua::Nil, + tlua::AsTable((("if_not_exists", true),)), + ), + ) + .map_err(LuaError::from)?; + + Ok(()) +} + +/// Drop a tarantool user and any entities (spaces, etc.) owned by it. +pub fn acl_drop_user_on_master(user_id: UserId) -> tarantool::Result<()> { + let lua = ::tarantool::lua_state(); + lua.exec_with("box.schema.user.drop(...)", user_id) + .map_err(LuaError::from)?; + + Ok(()) +} + //////////////////////////////////////////////////////////////////////////////// // local schema version //////////////////////////////////////////////////////////////////////////////// @@ -2111,7 +2327,8 @@ mod tests { let snapshot_data = Clusterwide::snapshot_data().unwrap(); let space_dumps = snapshot_data.space_dumps; - assert_eq!(space_dumps.len(), 6); + let n_internal_spaces = ClusterwideSpace::values().len(); + assert_eq!(space_dumps.len(), n_internal_spaces); for space_dump in &space_dumps { match &space_dump.space_name { @@ -2150,8 +2367,12 @@ mod tests { let []: [(); 0] = Decode::decode(space_dump.tuples.as_ref()).unwrap(); } - _ => { - unreachable!(); + s if s == &*ClusterwideSpace::User => { + let []: [(); 0] = Decode::decode(space_dump.tuples.as_ref()).unwrap(); + } + + s => { + unreachable!("space dump for space '{s}'"); } } } diff --git a/src/traft/node.rs b/src/traft/node.rs index 7068061bba..aeb270efc2 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -13,11 +13,14 @@ use crate::loop_start; use crate::r#loop::FlowControl; use crate::rpc; use crate::schema::{Distribution, IndexDef, SpaceDef}; +use crate::storage::acl_global_create_user; use crate::storage::ddl_meta_drop_space; -use crate::storage::local_schema_version; use crate::storage::SnapshotData; use crate::storage::ToEntryIter as _; +use crate::storage::ToEntryIter as _; +use crate::storage::{acl_create_user_on_master, acl_drop_user_on_master}; use crate::storage::{ddl_abort_on_master, ddl_meta_space_update_operable}; +use crate::storage::{local_schema_version, set_local_schema_version}; use crate::storage::{Clusterwide, ClusterwideSpaceId, PropertyName}; use crate::stringify_cfunc; use crate::sync; @@ -27,7 +30,7 @@ use crate::traft::error::Error; use crate::traft::event; use crate::traft::event::Event; use crate::traft::notify::{notification, Notifier, Notify}; -use crate::traft::op::{Ddl, Dml, Op, OpResult}; +use crate::traft::op::{Acl, Ddl, Dml, Op, OpResult}; use crate::traft::Address; use crate::traft::ConnectionPool; use crate::traft::ContextCoercion as _; @@ -969,6 +972,34 @@ impl NodeImpl { .delete(PropertyName::PendingSchemaVersion) .expect("storage error"); } + + Op::Acl { + schema_version, + acl: Acl::CreateUser { user_def }, + } => { + let v_local = local_schema_version().expect("storage error"); + let v_pending = schema_version; + if v_local < v_pending { + if self.is_readonly() { + // Wait for tarantool replication with master to progress. + return SleepAndRetry; + } else { + assert_eq!(user_def.schema_version, v_pending); + acl_create_user_on_master(&user_def).expect("creating user shouldn't fail"); + set_local_schema_version(v_pending).expect("storage error"); + } + } + + acl_global_create_user(&self.storage, &user_def) + .expect("persisting a user definition shouldn't fail"); + + storage_properties + .put(PropertyName::GlobalSchemaVersion, &v_pending) + .expect("storage error"); + storage_properties + .put(PropertyName::NextSchemaVersion, &(v_pending + 1)) + .expect("storage error"); + } } if let Some(lc) = &lc { diff --git a/src/traft/op.rs b/src/traft/op.rs index 4a65e4f438..36682223fe 100644 --- a/src/traft/op.rs +++ b/src/traft/op.rs @@ -1,4 +1,4 @@ -use crate::schema::Distribution; +use crate::schema::{Distribution, UserDef}; use crate::storage::space_by_name; use crate::storage::Clusterwide; use ::tarantool::index::{IndexId, Part}; @@ -45,6 +45,8 @@ pub enum Op { /// /// Only one pending DDL operation can exist at the same time. DdlAbort, + /// Cluster-wide access control list change operation. + Acl { schema_version: u64, acl: Acl }, } impl std::fmt::Display for Op { @@ -109,6 +111,16 @@ impl std::fmt::Display for Op { } Self::DdlCommit => write!(f, "DdlCommit"), Self::DdlAbort => write!(f, "DdlAbort"), + Self::Acl { + schema_version, + acl: Acl::CreateUser { user_def }, + } => { + write!( + f, + r#"CreateUser({schema_version}, {}, "{}")"#, + user_def.id, user_def.name, + ) + } }; struct DisplayAsJson<T>(pub T); @@ -395,6 +407,18 @@ impl DdlBuilder { } } +//////////////////////////////////////////////////////////////////////////////// +// Acl +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +#[serde(tag = "op_kind")] +pub enum Acl { + /// Create a tarantool user. Grant it default privileges. + CreateUser { user_def: UserDef }, +} + mod vec_of_raw_byte_buf { use super::TupleBuffer; use serde::de::Error as _; diff --git a/test/int/test_acl.py b/test/int/test_acl.py new file mode 100644 index 0000000000..6186ae0b1a --- /dev/null +++ b/test/int/test_acl.py @@ -0,0 +1,38 @@ +from conftest import Cluster, Instance + + +def propose_create_user( + instance: Instance, + id: int, + name: str, + password: str, + wait_index: bool = True, + timeout: int = 3, +) -> int: + digest = instance.call("box.internal.prepare_auth", "chap-sha1", password) + schema_version = instance.next_schema_version() + op = dict( + kind="acl", + op_kind="create_user", + user_def=dict( + id=id, + name=name, + auth=dict(type="chap-sha1", digest=digest), + schema_version=schema_version, + ), + ) + # TODO: use pico.cas + return instance.call("pico.raft_propose", op, timeout=timeout) + + +def test_acl_create_user_basic(cluster: Cluster): + i1, *_ = cluster.deploy(instance_count=4, init_replication_factor=2) + + username = "Bobby" + index = propose_create_user(i1, id=314, name=username, password="s3cr3t") + + for i in cluster.instances: + i.raft_wait_index(index) + + for i in cluster.instances: + assert i.call("box.space._user.index.name:get", username) is not None -- GitLab