From e9b1b821cd18477b2b3a2108bf79a2af89255e60 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Fri, 2 Jun 2023 19:24:27 +0300 Subject: [PATCH] fix: ddl drop space from snapshot --- src/storage.rs | 66 +++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 60 insertions(+), 6 deletions(-) diff --git a/src/storage.rs b/src/storage.rs index 9326516a7e..7b65f2aa05 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -19,7 +19,7 @@ use crate::traft::RaftId; use crate::traft::Result; use std::collections::hash_map::Entry; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; use std::rc::Rc; @@ -550,6 +550,12 @@ impl Clusterwide { pub fn apply_snapshot_data(&self, data: &SnapshotData, is_master: bool) -> Result<()> { debug_assert!(unsafe { ::tarantool::ffi::tarantool::box_txn() }); + // We need to save these before truncating _pico_space. + let mut old_space_versions: HashMap<SpaceId, u64> = HashMap::new(); + for space_def in self.spaces.iter()? { + old_space_versions.insert(space_def.id, space_def.schema_version); + } + let mut dont_exist_yet = Vec::new(); for space_dump in &data.space_dumps { let space_name = &space_dump.space_name; @@ -567,7 +573,7 @@ impl Clusterwide { // If we're not the replication master, the rest of the data will come // via tarantool replication. if is_master { - self.apply_ddl_changes_on_replicaset_master()?; + self.apply_ddl_changes_on_replicaset_master(&old_space_versions)?; set_local_schema_version(data.schema_version)?; } @@ -590,8 +596,59 @@ impl Clusterwide { Ok(()) } - pub fn apply_ddl_changes_on_replicaset_master(&self) -> traft::Result<()> { + pub fn apply_ddl_changes_on_replicaset_master( + &self, + old_space_versions: &HashMap<SpaceId, u64>, + ) -> traft::Result<()> { + debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); + + let mut space_defs = Vec::new(); + let mut new_space_ids = HashSet::new(); for space_def in self.spaces.iter()? { + new_space_ids.insert(space_def.id); + space_defs.push(space_def); + } + + // First we drop all the spaces which have been dropped. + for &old_space_id in old_space_versions.keys() { + if new_space_ids.contains(&old_space_id) { + // Will be handled later. + continue; + } + + // Space was dropped. + let abort_reason = ddl_drop_space_on_master(old_space_id)?; + if let Some(e) = abort_reason { + return Err(Error::other(format!( + "failed to drop space {old_space_id}: {e}" + ))); + } + } + + // Now create any new spaces, or replace ones that changed. + for space_def in &space_defs { + let space_id = space_def.id; + + if let Some(&v_old) = old_space_versions.get(&space_id) { + let v_new = space_def.schema_version; + assert!(v_old <= v_new); + + if v_old == v_new { + // Space is up to date. + continue; + } + + // Space was updated, need to drop it and recreate. + let abort_reason = ddl_drop_space_on_master(space_def.id)?; + if let Some(e) = abort_reason { + return Err(Error::other(format!( + "failed to drop space {space_id}: {e}" + ))); + } + } else { + // New space. + } + if !space_def.operable { // If it so happens, that we receive an unfinished schema change via snapshot, // which will somehow get finished without the governor sending us @@ -610,9 +667,6 @@ impl Clusterwide { // TODO: secondary indexes - // TODO: check if a space exists here in box.space._space, but doesn't - // exist in pico._space, then delete it - Ok(()) } -- GitLab