From 5d74aa1e8045051b675db0545c1e3ab90b881bad Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Thu, 15 Jun 2023 14:10:49 +0300
Subject: [PATCH] refactor: extract a generic function for applying schema
 changes from snapshot

---
 src/storage.rs | 227 ++++++++++++++++++++++++++++---------------------
 1 file changed, 128 insertions(+), 99 deletions(-)

diff --git a/src/storage.rs b/src/storage.rs
index 4e597cc16a..e46d92d4d4 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -581,15 +581,15 @@ impl Clusterwide {
     pub fn apply_snapshot_data(&self, data: &SnapshotData, is_master: bool) -> Result<()> {
         debug_assert!(unsafe { ::tarantool::ffi::tarantool::box_txn() });
 
-        // These need to be saved before we truncate the corresponding space.
-        let mut old_space_versions: HashMap<SpaceId, u64> = HashMap::new();
-        for space_def in self.spaces.iter()? {
-            old_space_versions.insert(space_def.id, space_def.schema_version);
-        }
-
+        // These need to be saved before we truncate the corresponding spaces.
+        let mut old_space_versions = HashMap::new();
         let mut old_user_versions = HashMap::new();
-        for user_def in self.users.iter()? {
-            old_user_versions.insert(user_def.id, user_def.schema_version);
+
+        for def in self.spaces.iter()? {
+            old_space_versions.insert(def.id, def.schema_version);
+        }
+        for def in self.users.iter()? {
+            old_user_versions.insert(def.id, def.schema_version);
         }
 
         let mut dont_exist_yet = Vec::new();
@@ -609,8 +609,9 @@ impl Clusterwide {
         // If we're not the replication master, the rest of the data will come
         // via tarantool replication.
         if is_master {
-            self.apply_ddl_changes_on_master(&old_space_versions)?;
-            self.apply_acl_changes_on_master(&old_user_versions)?;
+            self.apply_schema_changes_on_master(self.spaces.iter()?, &old_space_versions)?;
+            // TODO: secondary indexes
+            self.apply_schema_changes_on_master(self.users.iter()?, &old_user_versions)?;
             set_local_schema_version(data.schema_version)?;
         }
 
@@ -633,121 +634,60 @@ impl Clusterwide {
         Ok(())
     }
 
-    pub fn apply_ddl_changes_on_master(
+    fn apply_schema_changes_on_master<T>(
         &self,
-        old_space_versions: &HashMap<SpaceId, u64>,
-    ) -> traft::Result<()> {
+        iter: impl Iterator<Item = T>,
+        old_versions: &HashMap<T::Key, u64>,
+    ) -> traft::Result<()>
+    where
+        T: SchemaDef,
+    {
         debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
 
-        let mut space_defs = Vec::new();
-        let mut new_space_ids = HashSet::new();
-        for space_def in self.spaces.iter()? {
-            new_space_ids.insert(space_def.id);
-            space_defs.push(space_def);
+        let mut new_defs = Vec::new();
+        let mut new_keys = HashSet::new();
+        for def in iter {
+            new_keys.insert(def.key());
+            new_defs.push(def);
         }
 
-        // First we drop all the spaces which have been dropped.
-        for &old_space_id in old_space_versions.keys() {
-            if new_space_ids.contains(&old_space_id) {
+        // First we drop all schema entities which have been dropped.
+        for old_key in old_versions.keys() {
+            if new_keys.contains(old_key) {
                 // Will be handled later.
                 continue;
             }
 
-            // Space was dropped.
-            let abort_reason = ddl_drop_space_on_master(old_space_id)?;
-            if let Some(e) = abort_reason {
-                return Err(Error::other(format!(
-                    "failed to drop space {old_space_id}: {e}"
-                )));
-            }
+            // Schema definition was dropped.
+            T::on_delete(old_key, self)?;
         }
 
-        // Now create any new spaces, or replace ones that changed.
-        for space_def in &space_defs {
-            let space_id = space_def.id;
-
-            if let Some(&v_old) = old_space_versions.get(&space_id) {
-                let v_new = space_def.schema_version;
+        // Now create any new schema entities, or replace ones that changed.
+        for def in &new_defs {
+            let key = def.key();
+            if let Some(&v_old) = old_versions.get(&key) {
+                let v_new = def.schema_version();
                 assert!(v_old <= v_new);
 
                 if v_old == v_new {
-                    // Space is up to date.
+                    // Schema entity is up to date.
                     continue;
                 }
 
-                // Space was updated, need to drop it and recreate.
-                let abort_reason = ddl_drop_space_on_master(space_def.id)?;
-                if let Some(e) = abort_reason {
-                    return Err(Error::other(format!(
-                        "failed to drop space {space_id}: {e}"
-                    )));
-                }
+                // Schema entity changed, need to drop it an recreate.
+                T::on_delete(&key, self)?;
             } else {
-                // New space.
+                // New schema entity.
             }
 
-            if !space_def.operable {
+            if !def.is_operable() {
                 // If it so happens, that we receive an unfinished schema change via snapshot,
                 // which will somehow get finished without the governor sending us
                 // a proc_apply_schema_change rpc, it will be a very sad day.
                 continue;
             }
 
-            let abort_reason = ddl_create_space_on_master(self, space_def.id)?;
-            if let Some(e) = abort_reason {
-                return Err(Error::other(format!(
-                    "failed to create space {}: {e}",
-                    space_def.id
-                )));
-            }
-        }
-
-        // TODO: secondary indexes
-
-        Ok(())
-    }
-
-    pub fn apply_acl_changes_on_master(
-        &self,
-        old_user_versions: &HashMap<UserId, u64>,
-    ) -> traft::Result<()> {
-        let mut user_defs = Vec::new();
-        let mut new_user_ids = HashSet::new();
-        for user_def in self.users.iter()? {
-            new_user_ids.insert(user_def.id);
-            user_defs.push(user_def);
-        }
-
-        // First we drop all users which have been dropped.
-        for &old_user_id in old_user_versions.keys() {
-            if new_user_ids.contains(&old_user_id) {
-                // Will be handled later.
-                continue;
-            }
-
-            // User was dropped.
-            acl_drop_user_on_master(old_user_id)?;
-        }
-
-        // Now create any new users, or replace ones that changed.
-        for user_def in &user_defs {
-            let user_id = user_def.id;
-            if let Some(&v_old) = old_user_versions.get(&user_id) {
-                let v_new = user_def.schema_version;
-                assert!(v_old <= v_new);
-
-                if v_old == v_new {
-                    // User def is up to date.
-                    continue;
-                }
-
-                // User def changed, need to drop it and recreate.
-                acl_drop_user_on_master(user_id)?;
-            } else {
-                // New user.
-            }
-
-            acl_create_user_on_master(user_def)?;
+            T::on_insert(def, self)?;
         }
 
         Ok(())
@@ -2111,6 +2051,95 @@ impl ToEntryIter for Privileges {
     }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// SchemaDef
+////////////////////////////////////////////////////////////////////////////////
+
+/// Types implemeting this trait describe different kinds of schema definitions
+/// (both ddl and acl).
+///
+/// This trait is currently only used to minimize code duplication in the
+/// [`apply_snapshot_data`] function.
+trait SchemaDef {
+    type Key: std::hash::Hash + Eq;
+    fn key(&self) -> Self::Key;
+    fn schema_version(&self) -> u64;
+    fn is_operable(&self) -> bool {
+        true
+    }
+    fn on_insert(&self, storage: &Clusterwide) -> traft::Result<()>;
+    fn on_delete(key: &Self::Key, storage: &Clusterwide) -> traft::Result<()>;
+}
+
+impl SchemaDef for SpaceDef {
+    type Key = SpaceId;
+
+    #[inline(always)]
+    fn key(&self) -> SpaceId {
+        self.id
+    }
+
+    #[inline(always)]
+    fn schema_version(&self) -> u64 {
+        self.schema_version
+    }
+
+    #[inline(always)]
+    fn is_operable(&self) -> bool {
+        self.operable
+    }
+
+    #[inline(always)]
+    fn on_insert(&self, storage: &Clusterwide) -> traft::Result<()> {
+        let space_id = self.id;
+        if let Some(abort_reason) = ddl_create_space_on_master(storage, space_id)? {
+            return Err(Error::other(format!(
+                "failed to create space {space_id}: {abort_reason}"
+            )));
+        }
+        Ok(())
+    }
+
+    #[inline(always)]
+    fn on_delete(space_id: &SpaceId, storage: &Clusterwide) -> traft::Result<()> {
+        _ = storage;
+        if let Some(abort_reason) = ddl_drop_space_on_master(*space_id)? {
+            return Err(Error::other(format!(
+                "failed to drop space {space_id}: {abort_reason}"
+            )));
+        }
+        Ok(())
+    }
+}
+
+impl SchemaDef for UserDef {
+    type Key = UserId;
+
+    #[inline(always)]
+    fn key(&self) -> UserId {
+        self.id
+    }
+
+    #[inline(always)]
+    fn schema_version(&self) -> u64 {
+        self.schema_version
+    }
+
+    #[inline(always)]
+    fn on_insert(&self, storage: &Clusterwide) -> traft::Result<()> {
+        _ = storage;
+        acl_create_user_on_master(self)?;
+        Ok(())
+    }
+
+    #[inline(always)]
+    fn on_delete(user_id: &UserId, storage: &Clusterwide) -> traft::Result<()> {
+        _ = storage;
+        acl_drop_user_on_master(*user_id)?;
+        Ok(())
+    }
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // acl global
 ////////////////////////////////////////////////////////////////////////////////
-- 
GitLab