From 5c7367c5e5fe4203522a49d1285912fac46f2e93 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Mon, 14 Aug 2023 17:01:11 +0300 Subject: [PATCH] fix: used to panic if master changed during application of snapshot --- src/storage.rs | 40 ++++++++++++++++++++++++++++++++++------ test/int/test_acl.py | 2 +- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/src/storage.rs b/src/storage.rs index 19d82b2439..c6e79aef71 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -644,13 +644,20 @@ impl Clusterwide { // If we're not the replication master, the rest of the data will come // via tarantool replication. - if is_master { + // + // v_local == v_snapshot could happen for example in case of an + // unfortunately timed replication master failover, in which case the + // schema portion of the snapshot was already applied and we should skip + // it here. + let v_local = local_schema_version()?; + let v_snapshot = data.schema_version; + if is_master && v_local < v_snapshot { self.apply_schema_changes_on_master(self.spaces.iter()?, &old_space_versions)?; // TODO: secondary indexes self.apply_schema_changes_on_master(self.users.iter()?, &old_user_versions)?; self.apply_schema_changes_on_master(self.roles.iter()?, &old_role_versions)?; self.apply_schema_changes_on_master(self.privileges.iter()?, &old_priv_versions)?; - set_local_schema_version(data.schema_version)?; + set_local_schema_version(v_snapshot)?; } for space_dump in &data.space_dumps { @@ -718,7 +725,14 @@ impl Clusterwide { } // Schema definition was dropped. - T::on_delete(old_key, self)?; + if let Err(e) = T::on_delete(old_key, self) { + tlog!( + Error, + "failed handling delete of {} with key {old_key:?}: {e}", + std::any::type_name::<T>(), + ); + return Err(e); + } } // Now create any new schema entities, or replace ones that changed. @@ -734,7 +748,14 @@ impl Clusterwide { } // Schema entity changed, need to drop it an recreate. - T::on_delete(&key, self)?; + if let Err(e) = T::on_delete(&key, self) { + tlog!( + Error, + "failed handling delete of {} with key {key:?}: {e}", + std::any::type_name::<T>(), + ); + return Err(e); + } } else { // New schema entity. } @@ -746,7 +767,14 @@ impl Clusterwide { continue; } - T::on_insert(def, self)?; + if let Err(e) = T::on_insert(def, self) { + tlog!( + Error, + "failed handling insert of {} with key {key:?}: {e}", + std::any::type_name::<T>(), + ); + return Err(e); + } } Ok(()) @@ -2280,7 +2308,7 @@ impl ToEntryIter for Privileges { trait SchemaDef { /// Type of unique key used to identify entities for the purpose of /// associating the schema version with. - type Key: std::hash::Hash + Eq; + type Key: std::hash::Hash + Eq + std::fmt::Debug; /// Extract unique key from the entity. fn key(&self) -> Self::Key; diff --git a/test/int/test_acl.py b/test/int/test_acl.py index 44358c68b8..781a92087d 100644 --- a/test/int/test_acl.py +++ b/test/int/test_acl.py @@ -685,7 +685,7 @@ def test_acl_from_snapshot(cluster: Cluster): i3.raft_compact_log() # - # Cacthup by snapshot. + # Catchup by snapshot. # i4.start() i5.start() -- GitLab