Skip to content
Snippets Groups Projects
Commit 5d74aa1e authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor: extract a generic function for applying schema changes from snapshot

parent 1b8475c4
No related branches found
No related tags found
1 merge request!573Implement basic ACL functions
......@@ -581,15 +581,15 @@ impl Clusterwide {
pub fn apply_snapshot_data(&self, data: &SnapshotData, is_master: bool) -> Result<()> {
debug_assert!(unsafe { ::tarantool::ffi::tarantool::box_txn() });
// These need to be saved before we truncate the corresponding space.
let mut old_space_versions: HashMap<SpaceId, u64> = HashMap::new();
for space_def in self.spaces.iter()? {
old_space_versions.insert(space_def.id, space_def.schema_version);
}
// These need to be saved before we truncate the corresponding spaces.
let mut old_space_versions = HashMap::new();
let mut old_user_versions = HashMap::new();
for user_def in self.users.iter()? {
old_user_versions.insert(user_def.id, user_def.schema_version);
for def in self.spaces.iter()? {
old_space_versions.insert(def.id, def.schema_version);
}
for def in self.users.iter()? {
old_user_versions.insert(def.id, def.schema_version);
}
let mut dont_exist_yet = Vec::new();
......@@ -609,8 +609,9 @@ impl Clusterwide {
// If we're not the replication master, the rest of the data will come
// via tarantool replication.
if is_master {
self.apply_ddl_changes_on_master(&old_space_versions)?;
self.apply_acl_changes_on_master(&old_user_versions)?;
self.apply_schema_changes_on_master(self.spaces.iter()?, &old_space_versions)?;
// TODO: secondary indexes
self.apply_schema_changes_on_master(self.users.iter()?, &old_user_versions)?;
set_local_schema_version(data.schema_version)?;
}
......@@ -633,121 +634,60 @@ impl Clusterwide {
Ok(())
}
pub fn apply_ddl_changes_on_master(
fn apply_schema_changes_on_master<T>(
&self,
old_space_versions: &HashMap<SpaceId, u64>,
) -> traft::Result<()> {
iter: impl Iterator<Item = T>,
old_versions: &HashMap<T::Key, u64>,
) -> traft::Result<()>
where
T: SchemaDef,
{
debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
let mut space_defs = Vec::new();
let mut new_space_ids = HashSet::new();
for space_def in self.spaces.iter()? {
new_space_ids.insert(space_def.id);
space_defs.push(space_def);
let mut new_defs = Vec::new();
let mut new_keys = HashSet::new();
for def in iter {
new_keys.insert(def.key());
new_defs.push(def);
}
// First we drop all the spaces which have been dropped.
for &old_space_id in old_space_versions.keys() {
if new_space_ids.contains(&old_space_id) {
// First we drop all schema entities which have been dropped.
for old_key in old_versions.keys() {
if new_keys.contains(old_key) {
// Will be handled later.
continue;
}
// Space was dropped.
let abort_reason = ddl_drop_space_on_master(old_space_id)?;
if let Some(e) = abort_reason {
return Err(Error::other(format!(
"failed to drop space {old_space_id}: {e}"
)));
}
// Schema definition was dropped.
T::on_delete(old_key, self)?;
}
// Now create any new spaces, or replace ones that changed.
for space_def in &space_defs {
let space_id = space_def.id;
if let Some(&v_old) = old_space_versions.get(&space_id) {
let v_new = space_def.schema_version;
// Now create any new schema entities, or replace ones that changed.
for def in &new_defs {
let key = def.key();
if let Some(&v_old) = old_versions.get(&key) {
let v_new = def.schema_version();
assert!(v_old <= v_new);
if v_old == v_new {
// Space is up to date.
// Schema entity is up to date.
continue;
}
// Space was updated, need to drop it and recreate.
let abort_reason = ddl_drop_space_on_master(space_def.id)?;
if let Some(e) = abort_reason {
return Err(Error::other(format!(
"failed to drop space {space_id}: {e}"
)));
}
// Schema entity changed, need to drop it an recreate.
T::on_delete(&key, self)?;
} else {
// New space.
// New schema entity.
}
if !space_def.operable {
if !def.is_operable() {
// If it so happens, that we receive an unfinished schema change via snapshot,
// which will somehow get finished without the governor sending us
// a proc_apply_schema_change rpc, it will be a very sad day.
continue;
}
let abort_reason = ddl_create_space_on_master(self, space_def.id)?;
if let Some(e) = abort_reason {
return Err(Error::other(format!(
"failed to create space {}: {e}",
space_def.id
)));
}
}
// TODO: secondary indexes
Ok(())
}
pub fn apply_acl_changes_on_master(
&self,
old_user_versions: &HashMap<UserId, u64>,
) -> traft::Result<()> {
let mut user_defs = Vec::new();
let mut new_user_ids = HashSet::new();
for user_def in self.users.iter()? {
new_user_ids.insert(user_def.id);
user_defs.push(user_def);
}
// First we drop all users which have been dropped.
for &old_user_id in old_user_versions.keys() {
if new_user_ids.contains(&old_user_id) {
// Will be handled later.
continue;
}
// User was dropped.
acl_drop_user_on_master(old_user_id)?;
}
// Now create any new users, or replace ones that changed.
for user_def in &user_defs {
let user_id = user_def.id;
if let Some(&v_old) = old_user_versions.get(&user_id) {
let v_new = user_def.schema_version;
assert!(v_old <= v_new);
if v_old == v_new {
// User def is up to date.
continue;
}
// User def changed, need to drop it and recreate.
acl_drop_user_on_master(user_id)?;
} else {
// New user.
}
acl_create_user_on_master(user_def)?;
T::on_insert(def, self)?;
}
Ok(())
......@@ -2111,6 +2051,95 @@ impl ToEntryIter for Privileges {
}
}
////////////////////////////////////////////////////////////////////////////////
// SchemaDef
////////////////////////////////////////////////////////////////////////////////
/// Types implemeting this trait describe different kinds of schema definitions
/// (both ddl and acl).
///
/// This trait is currently only used to minimize code duplication in the
/// [`apply_snapshot_data`] function.
trait SchemaDef {
type Key: std::hash::Hash + Eq;
fn key(&self) -> Self::Key;
fn schema_version(&self) -> u64;
fn is_operable(&self) -> bool {
true
}
fn on_insert(&self, storage: &Clusterwide) -> traft::Result<()>;
fn on_delete(key: &Self::Key, storage: &Clusterwide) -> traft::Result<()>;
}
impl SchemaDef for SpaceDef {
type Key = SpaceId;
#[inline(always)]
fn key(&self) -> SpaceId {
self.id
}
#[inline(always)]
fn schema_version(&self) -> u64 {
self.schema_version
}
#[inline(always)]
fn is_operable(&self) -> bool {
self.operable
}
#[inline(always)]
fn on_insert(&self, storage: &Clusterwide) -> traft::Result<()> {
let space_id = self.id;
if let Some(abort_reason) = ddl_create_space_on_master(storage, space_id)? {
return Err(Error::other(format!(
"failed to create space {space_id}: {abort_reason}"
)));
}
Ok(())
}
#[inline(always)]
fn on_delete(space_id: &SpaceId, storage: &Clusterwide) -> traft::Result<()> {
_ = storage;
if let Some(abort_reason) = ddl_drop_space_on_master(*space_id)? {
return Err(Error::other(format!(
"failed to drop space {space_id}: {abort_reason}"
)));
}
Ok(())
}
}
impl SchemaDef for UserDef {
type Key = UserId;
#[inline(always)]
fn key(&self) -> UserId {
self.id
}
#[inline(always)]
fn schema_version(&self) -> u64 {
self.schema_version
}
#[inline(always)]
fn on_insert(&self, storage: &Clusterwide) -> traft::Result<()> {
_ = storage;
acl_create_user_on_master(self)?;
Ok(())
}
#[inline(always)]
fn on_delete(user_id: &UserId, storage: &Clusterwide) -> traft::Result<()> {
_ = storage;
acl_drop_user_on_master(*user_id)?;
Ok(())
}
}
////////////////////////////////////////////////////////////////////////////////
// acl global
////////////////////////////////////////////////////////////////////////////////
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment