diff --git a/src/traft/rpc/cas.rs b/src/traft/rpc/cas.rs index b32e538b1f346dd7aa770ee13f110423e6c9ca3c..baa9a78eff82a2ce3c55adc117deb28a4efbd2c7 100644 --- a/src/traft/rpc/cas.rs +++ b/src/traft/rpc/cas.rs @@ -1,5 +1,5 @@ -use crate::storage::TClusterwideSpace as _; -use crate::storage::{ClusterwideSpace, Indexes, Properties, Spaces}; +use crate::storage::{Clusterwide, TClusterwideSpace as _}; +use crate::storage::{ClusterwideSpace, Indexes, Spaces}; use crate::tlog; use crate::traft::error::Error as TraftError; use crate::traft::node; @@ -121,7 +121,7 @@ crate::define_rpc_request! { assert_eq!(entry.term, status.term); let entry_index = entry.index; let Some(op) = entry.into_op() else { continue }; - req.predicate.check_entry(entry_index, &op, &storage.properties)?; + req.predicate.check_entry(entry_index, &op, &storage)?; } } @@ -136,7 +136,7 @@ crate::define_rpc_request! { let Some(EntryContext::Normal(EntryContextNormal { op, .. })) = cx else { continue; }; - req.predicate.check_entry(entry.index, &op, &storage.properties)?; + req.predicate.check_entry(entry.index, &op, &storage)?; } // TODO: apply to limbo first @@ -221,7 +221,7 @@ impl Predicate { &self, entry_index: RaftIndex, entry_op: &Op, - properties: &Properties, + storage: &Clusterwide, ) -> std::result::Result<(), Error> { let error = || Error::ConflictFound { requested: self.index, @@ -265,7 +265,7 @@ impl Predicate { .expect("keys should convert to tuple") }); for range in &self.ranges { - if modifies_operable(entry_op, &range.space, properties) { + if modifies_operable(entry_op, &range.space, storage) { return Err(error()); } let Some(space) = space(entry_op) else { @@ -333,11 +333,19 @@ fn space(op: &Op) -> Option<ClusterwideSpace> { } /// Checks if the operation would by its semantics modify `operable` flag of the provided `space`. -fn modifies_operable(op: &Op, space: &str, properties: &Properties) -> bool { +fn modifies_operable(op: &Op, space: &str, storage: &Clusterwide) -> bool { let ddl_modifies = |ddl: &Ddl| match ddl { Ddl::CreateSpace { name, .. } => name == space, - Ddl::DropSpace { id: _id } => { - todo!("we should put clusterwide spaces defs into _picodata_space first") + Ddl::DropSpace { id } => { + if let Some(space_def) = storage + .spaces + .get(*id) + .expect("deserialization should not fail") + { + space_def.name == space + } else { + false + } } Ddl::CreateIndex { .. } => false, Ddl::DropIndex { .. } => false, @@ -345,7 +353,8 @@ fn modifies_operable(op: &Op, space: &str, properties: &Properties) -> bool { match op { Op::DdlPrepare { ddl, .. } => ddl_modifies(ddl), Op::DdlCommit | Op::DdlAbort => { - if let Some(change) = properties + if let Some(change) = storage + .properties .pending_schema_change() .expect("conversion should not fail") { @@ -362,47 +371,120 @@ mod tests { use serde::Serialize; use tarantool::tuple::ToTupleBuffer; + use crate::schema::{Distribution, SpaceDef}; use crate::storage::TClusterwideSpace as _; use crate::storage::{Clusterwide, Properties, PropertyName}; + use crate::traft::op::DdlBuilder; use super::*; #[::tarantool::test] fn ddl() { - let clusterwide = Clusterwide::new().unwrap(); - - let predicate = Predicate { - index: 1, - term: 1, - ranges: vec![Range { - space: Properties::SPACE_NAME.into(), - key_min: Bound::Included( - (PropertyName::PendingSchemaChange,) - .to_tuple_buffer() - .unwrap(), - ), - key_max: Bound::Included( - (PropertyName::PendingSchemaChange,) - .to_tuple_buffer() - .unwrap(), - ), - }], - }; - let err = predicate - .check_entry(2, &Op::DdlCommit, &clusterwide.properties) - .unwrap_err(); - assert_eq!( - &err.to_string(), - "comparison failed for index 1 as it conflicts with 2" + #[track_caller] + fn test<T: Serialize + ?Sized>( + op: &Op, + space: &str, + key: &T, + storage: &Clusterwide, + ) -> std::result::Result<(), Error> { + let predicate = Predicate { + index: 1, + term: 1, + ranges: vec![Range { + space: space.into(), + key_min: Bound::Included((key,).to_tuple_buffer().unwrap()), + key_max: Bound::Included((key,).to_tuple_buffer().unwrap()), + }], + }; + predicate.check_entry(2, op, storage) + } + let storage = Clusterwide::new().unwrap(); + + let builder = DdlBuilder::with_schema_version(1); + + let space_name = "space1"; + let space_id = 1; + let index_id = 1; + + let create_space = builder.create_space( + space_id, + space_name.into(), + vec![], + vec![], + Distribution::Global, ); + let drop_space = builder.drop_space(space_id); + let create_index = builder.create_index(space_id, index_id, vec![]); + let drop_index = builder.drop_index(space_id, index_id); + + let commit = Op::DdlCommit; + let abort = Op::DdlAbort; + + let all = [ + &create_space, + &drop_space, + &create_index, + &drop_index, + &commit, + &abort, + ]; + + for op in all { + assert!(test( + op, + Properties::SPACE_NAME, + PropertyName::PendingSchemaChange.into(), + &storage + ) + .is_err()); + assert!(test( + op, + Properties::SPACE_NAME, + PropertyName::PendingSchemaVersion.into(), + &storage + ) + .is_err()); + assert!(test(op, Properties::SPACE_NAME, "unrelated_key", &storage).is_ok()); + assert!(test(op, "unrelated_space", "unrelated_key", &storage).is_ok()) + } - Predicate { - index: 1, - term: 1, - ranges: vec![], + // `DropSpace` needs `SpaceDef` to get space name + storage + .spaces + .insert(&SpaceDef { + id: space_id, + name: space_name.into(), + operable: true, + distribution: Distribution::Global, + format: vec![], + schema_version: 1, + }) + .unwrap(); + for op in [&create_space, &drop_space] { + assert!(test(op, space_name, "any_key", &storage).is_err()); + } + + for op in [&create_index, &drop_index] { + assert!(test(op, space_name, "any_key", &storage).is_ok()); + } + + // Abort and Commit need a pending schema change to get space name + let Op::DdlPrepare{ ddl: create_space_ddl, ..} = create_space.clone() else { + unreachable!(); + }; + storage + .properties + .put(PropertyName::PendingSchemaChange, &create_space_ddl) + .unwrap(); + for op in [&abort, &commit] { + assert!(test( + op, + Properties::SPACE_NAME, + PropertyName::CurrentSchemaVersion.into(), + &storage + ) + .is_err()); } - .check_entry(2, &Op::DdlCommit, &clusterwide.properties) - .unwrap(); } #[::tarantool::test] @@ -443,11 +525,7 @@ mod tests { TestOp::Delete => Dml::Delete { space, key }, } }) - .map(|op| { - predicate - .check_entry(2, &Op::Dml(op), &storage.properties) - .is_err() - }) + .map(|op| predicate.check_entry(2, &Op::Dml(op), &storage).is_err()) .collect() }