From 190104c94eef0b09dcef7c0dbc7a8f6d5023162b Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Thu, 29 Aug 2024 20:23:13 +0300 Subject: [PATCH] fix: plugin cas used to check against wrong pico property --- src/cas.rs | 67 ++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 55 insertions(+), 12 deletions(-) diff --git a/src/cas.rs b/src/cas.rs index 89a7ac8422..89d8ed7ae3 100644 --- a/src/cas.rs +++ b/src/cas.rs @@ -482,6 +482,12 @@ impl Error { Self::EmptyBatch => TarantoolErrorCode::IllegalParams as _, } } + + #[allow(non_snake_case)] + #[inline(always)] + pub fn ConflictFound(conflict_index: RaftIndex) -> Self { + Self::ConflictFound { conflict_index } + } } /// Represents a lua table describing a [`Predicate`]. @@ -573,9 +579,6 @@ impl Predicate { entry_op: &Op, storage: &Clusterwide, ) -> std::result::Result<(), Error> { - let error = || Error::ConflictFound { - conflict_index: entry_index, - }; let check_dml = |op: &Dml, space_id: u32, range: &Range| -> std::result::Result<(), Error> { match op { @@ -583,14 +586,14 @@ impl Predicate { let key = Tuple::new(key)?; let key_def = storage::cached_key_def_for_key(space_id, 0)?; if range.contains(&key_def, &key) { - return Err(error()); + return Err(Error::ConflictFound(entry_index)); } } Dml::Insert { tuple, .. } | Dml::Replace { tuple, .. } => { let tuple = Tuple::new(tuple)?; let key_def = storage::cached_key_def(space_id, 0)?; if range.contains(&key_def, &tuple) { - return Err(error()); + return Err(Error::ConflictFound(entry_index)); } } } @@ -598,7 +601,7 @@ impl Predicate { }; for range in &self.ranges { if modifies_operable(entry_op, range.table, storage) { - return Err(error()); + return Err(Error::ConflictFound(entry_index)); } // TODO: check operation's space exists @@ -617,22 +620,31 @@ impl Predicate { check_dml(dml, dml.space(), range)? } } - Op::DdlPrepare { .. } - | Op::DdlCommit - | Op::DdlAbort { .. } - | Op::Acl { .. } - | Op::Plugin { .. } => { + Op::DdlPrepare { .. } | Op::DdlCommit | Op::DdlAbort { .. } | Op::Acl { .. } => { let space = ClusterwideTable::Property.id(); if space != range.table { continue; } let key_def = storage::cached_key_def_for_key(space, 0)?; for key in schema_related_property_keys() { + // NOTE: this is just a string comparison if range.contains(&key_def, key) { - return Err(error()); + return Err(Error::ConflictFound(entry_index)); } } } + Op::Plugin { .. } => { + let space = ClusterwideTable::Property.id(); + if space != range.table { + continue; + } + let key_def = storage::cached_key_def_for_key(space, 0)?; + let key = pending_plugin_operation_key(); + // NOTE: this is just a string comparison + if range.contains(&key_def, key) { + return Err(Error::ConflictFound(entry_index)); + } + } Op::Nop => (), }; } @@ -667,6 +679,16 @@ fn schema_related_property_keys() -> &'static [Tuple] { } } +fn pending_plugin_operation_key() -> &'static Tuple { + static mut TUPLE: Option<Tuple> = None; + // Safety: only called from main thread + unsafe { + TUPLE.get_or_insert_with(|| { + Tuple::new(&[storage::PropertyName::PendingPluginOperation]).expect("cannot fail") + }) + } +} + /// Returns a slice of [`Range`] structs which are needed for the CaS /// request which performs a schema change operation. pub fn schema_change_ranges() -> &'static [Range] { @@ -763,6 +785,27 @@ impl Range { }) } + pub fn for_op(op: &Op) -> Result<Vec<Self>> { + match op { + Op::Nop => Ok(vec![]), + Op::Dml(dml) => { + let range = Self::for_dml(dml)?; + Ok(vec![range]) + } + Op::BatchDml { ops } => ops.iter().map(Self::for_dml).collect(), + Op::DdlPrepare { .. } | Op::DdlCommit | Op::DdlAbort { .. } | Op::Acl { .. } => { + let range = Self::new(ClusterwideTable::Property) + .eq(&[storage::PropertyName::GlobalSchemaVersion]); + Ok(vec![range]) + } + Op::Plugin { .. } => { + let range = Self::new(ClusterwideTable::Property) + .eq(&[storage::PropertyName::PendingPluginOperation]); + Ok(vec![range]) + } + } + } + #[inline(always)] pub fn from_parts( table: impl Into<SpaceId>, -- GitLab