diff --git a/src/storage.rs b/src/storage.rs index 4b5b0f28f6b452e92dfa0e4579a1f88079e72703..549f55f1b574302fba35e39824933cb31d8367c9 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -22,6 +22,7 @@ use crate::traft::Result; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::marker::PhantomData; +use std::rc::Rc; macro_rules! define_clusterwide_spaces { ( @@ -213,9 +214,20 @@ macro_rules! define_clusterwide_spaces { } #[inline(always)] - fn space(&self, space: $cw_space) -> &Space { - match space.into() { - $( $cw_index::$cw_space_var(_) => &self.$cw_field.space, )+ + fn space_by_name(&self, space: impl AsRef<str>) -> tarantool::Result<Space> { + let space = space.as_ref(); + match space { + $( $cw_space_name => Ok(self.$cw_field.space.clone()), )+ + _ => { + Space::find(space).ok_or_else(|| { + tarantool::set_error!( + tarantool::error::TarantoolErrorCode::NoSuchSpace, + "no such space \"{}\"", + space + ); + tarantool::error::TarantoolError::last().into() + }) + } } } @@ -415,8 +427,12 @@ define_clusterwide_spaces! { impl Clusterwide { pub fn apply_snapshot_data(&self, raw_data: &[u8], is_master: bool) -> Result<()> { let data = SnapshotData::decode(raw_data)?; + let mut dont_exist_yet = Vec::new(); for space_dump in &data.space_dumps { - let space = self.space(space_dump.space); + let Ok(space) = self.space_by_name(&space_dump.space) else { + dont_exist_yet.push(space_dump); + continue; + }; space.truncate()?; let tuples = space_dump.tuples.as_ref(); for tuple in ValueIter::from_array(tuples).map_err(TntError::from)? { @@ -482,8 +498,112 @@ impl Clusterwide { // TODO: check if a space exists here in box.space._space, but doesn't // exist in pico._space, then delete it + // These are likely globally distributed user-defined spaces, which + // just got defined from the metadata which arrived in the _pico_space + // dump. + for space_dump in &dont_exist_yet { + let Ok(space) = self.space_by_name(&space_dump.space) else { + crate::warn_or_panic!("a dump for a non existent space '{}' arrived via snapshot", space_dump.space); + continue; + }; + space.truncate()?; + let tuples = space_dump.tuples.as_ref(); + for tuple in ValueIter::from_array(tuples).map_err(TntError::from)? { + space.insert(RawBytes::new(tuple))?; + } + } + Ok(()) } + + /// Return a `KeyDef` to be used for comparing **tuples** of the + /// corresponding global space. + pub(crate) fn key_def(&self, space: &str, index: IndexId) -> tarantool::Result<Rc<KeyDef>> { + static mut KEY_DEF: Option<HashMap<(ClusterwideSpace, IndexId), Rc<KeyDef>>> = None; + let key_defs = unsafe { KEY_DEF.get_or_insert_with(HashMap::new) }; + if let Some(sys_space) = space.parse::<ClusterwideSpace>().ok() { + let key_def = match key_defs.entry((sys_space, index)) { + Entry::Occupied(o) => o.into_mut(), + Entry::Vacant(v) => { + let space = self.space_by_name(space).expect("system spaces are always present"); + let index = unsafe { Index::from_ids_unchecked(space.id(), index) }; + let key_def = index.meta()?.to_key_def(); + // System space definition's never change during a single + // execution, so it's safe to cache these + v.insert(Rc::new(key_def)) + } + }; + return Ok(key_def.clone()); + } + + let space = self.space_by_name(space).expect("system spaces are always present"); + let index = unsafe { Index::from_ids_unchecked(space.id(), index) }; + let key_def = index.meta()?.to_key_def(); + Ok(Rc::new(key_def)) + } + + /// Return a `KeyDef` to be used for comparing **keys** of the + /// corresponding global space. + pub(crate) fn key_def_for_key( + &self, + space: &str, + index: IndexId, + ) -> tarantool::Result<Rc<KeyDef>> { + static mut KEY_DEF: Option<HashMap<(ClusterwideSpace, IndexId), Rc<KeyDef>>> = None; + let key_defs = unsafe { KEY_DEF.get_or_insert_with(HashMap::new) }; + if let Some(sys_space) = space.parse::<ClusterwideSpace>().ok() { + let key_def = match key_defs.entry((sys_space, index)) { + Entry::Occupied(o) => o.into_mut(), + Entry::Vacant(v) => { + let space = self.space_by_name(space).expect("system spaces are always present"); + let index = unsafe { Index::from_ids_unchecked(space.id(), index) }; + let key_def = index.meta()?.to_key_def_for_key(); + // System space definition's never change during a single + // execution, so it's safe to cache these + v.insert(Rc::new(key_def)) + } + }; + return Ok(key_def.clone()); + } + + let space = self.space_by_name(space).expect("system spaces are always present"); + let index = unsafe { Index::from_ids_unchecked(space.id(), index) }; + let key_def = index.meta()?.to_key_def_for_key(); + Ok(Rc::new(key_def)) + } + + pub(crate) fn insert(&self, space: &str, tuple: &TupleBuffer) -> tarantool::Result<Tuple> { + let space = self.space_by_name(space)?; + let res = space.insert(tuple)?; + Ok(res) + } + + pub(crate) fn replace(&self, space: &str, tuple: &TupleBuffer) -> tarantool::Result<Tuple> { + let space = self.space_by_name(space)?; + let res = space.replace(tuple)?; + Ok(res) + } + + pub(crate) fn update( + &self, + space: &str, + key: &TupleBuffer, + ops: &[TupleBuffer], + ) -> tarantool::Result<Option<Tuple>> { + let space = self.space_by_name(space)?; + let res = space.update(key, ops)?; + Ok(res) + } + + pub(crate) fn delete( + &self, + space: &str, + key: &TupleBuffer, + ) -> tarantool::Result<Option<Tuple>> { + let space = self.space_by_name(space)?; + let res = space.delete(key)?; + Ok(res) + } } //////////////////////////////////////////////////////////////////////////////// @@ -528,7 +648,7 @@ impl ClusterwideSpace { tuples.len() ); Ok(SpaceDump { - space: *self, + space: (*self).into(), tuples, }) } @@ -552,66 +672,6 @@ pub trait TClusterwideSpace { /// A type alias for getting the enumeration of indexes for a clusterwide space. pub type IndexOf<T> = <T as TClusterwideSpace>::Index; -impl ClusterwideSpaceIndex { - #[inline] - pub(crate) fn get(&self) -> tarantool::Result<Index> { - let space = self.space().get()?; - let index_name = self.index_name(); - let Some(index) = space.index_cached(index_name) else { - tarantool::set_error!( - tarantool::error::TarantoolErrorCode::NoSuchIndexName, - "no such index \"{}\"", index_name - ); - return Err(tarantool::error::TarantoolError::last().into()); - }; - Ok(index) - } - - /// Return reference to a cached instance of `KeyDef` to be used for - /// comparing **tuples** of the corresponding cluster-wide space. - pub(crate) fn key_def(&self) -> tarantool::Result<&'static KeyDef> { - static mut KEY_DEF: Option<HashMap<ClusterwideSpaceIndex, KeyDef>> = None; - let key_defs = unsafe { KEY_DEF.get_or_insert_with(HashMap::new) }; - let key_def = match key_defs.entry(*self) { - Entry::Occupied(o) => o.into_mut(), - Entry::Vacant(v) => { - let key_def = self.get()?.meta()?.to_key_def(); - v.insert(key_def) - } - }; - Ok(key_def) - } - - /// Return reference to a cached instance of `KeyDef` to be used for - /// comparing **keys** of the corresponding cluster-wide space. - pub(crate) fn key_def_for_key(&self) -> tarantool::Result<&'static KeyDef> { - static mut KEY_DEF: Option<HashMap<ClusterwideSpaceIndex, KeyDef>> = None; - let key_defs = unsafe { KEY_DEF.get_or_insert_with(HashMap::new) }; - let key_def = match key_defs.entry(*self) { - Entry::Occupied(o) => o.into_mut(), - Entry::Vacant(v) => { - let key_def = self.get()?.meta()?.to_key_def_for_key(); - v.insert(key_def) - } - }; - Ok(key_def) - } - - #[inline] - pub fn update( - &self, - key: &impl ToTupleBuffer, - ops: &[impl ToTupleBuffer], - ) -> tarantool::Result<Option<Tuple>> { - self.get()?.update(key, ops) - } - - #[inline] - pub fn delete(&self, key: &impl ToTupleBuffer) -> tarantool::Result<Option<Tuple>> { - self.get()?.delete(key) - } -} - impl std::fmt::Display for ClusterwideSpaceIndex { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { if self.is_primary() { @@ -1305,7 +1365,7 @@ impl Encode for SnapshotData {} #[derive(Debug, ::serde::Serialize, ::serde::Deserialize)] pub struct SpaceDump { - pub space: ClusterwideSpace, + pub space: String, #[serde(with = "serde_bytes")] pub tuples: TupleBuffer, } @@ -1545,7 +1605,7 @@ mod tests { // r3 ("i5", "i5-uuid", 5u64, "r3", "r3-uuid", (CGV::Online, 0), (TGV::Online, 0), &faildom,), ] { - storage.space(ClusterwideSpace::Instance).put(&instance).unwrap(); + storage.space_by_name(ClusterwideSpace::Instance).unwrap().put(&instance).unwrap(); let (_, _, raft_id, ..) = instance; space_peer_addresses.put(&(raft_id, format!("addr:{raft_id}"))).unwrap(); } @@ -1653,7 +1713,7 @@ mod tests { ) ); - let space = storage.space(ClusterwideSpace::Instance); + let space = storage.space_by_name(ClusterwideSpace::Instance).unwrap(); space.drop().unwrap(); assert_err!( @@ -1691,11 +1751,13 @@ mod tests { let storage = Clusterwide::new().unwrap(); storage - .space(ClusterwideSpace::Address) + .space_by_name(ClusterwideSpace::Address) + .unwrap() .insert(&(1, "foo")) .unwrap(); storage - .space(ClusterwideSpace::Address) + .space_by_name(ClusterwideSpace::Address) + .unwrap() .insert(&(2, "bar")) .unwrap(); @@ -1761,14 +1823,14 @@ mod tests { assert_eq!(space_dumps.len(), 7); for space_dump in &space_dumps { - match space_dump.space { - ClusterwideSpace::Instance => { + match &space_dump.space { + s if s == &*ClusterwideSpace::Instance => { let [instance]: [Instance; 1] = Decode::decode(space_dump.tuples.as_ref()).unwrap(); assert_eq!(instance, i); } - ClusterwideSpace::Address => { + s if s == &*ClusterwideSpace::Address => { let addrs: [(i32, String); 2] = Decode::decode(space_dump.tuples.as_ref()).unwrap(); assert_eq!( @@ -1777,31 +1839,35 @@ mod tests { ); } - ClusterwideSpace::Property => { + s if s == &*ClusterwideSpace::Property => { let [property]: [(String, String); 1] = Decode::decode(space_dump.tuples.as_ref()).unwrap(); assert_eq!(property, ("foo".to_owned(), "bar".to_owned())); } - ClusterwideSpace::Replicaset => { + s if s == &*ClusterwideSpace::Replicaset => { let [replicaset]: [Replicaset; 1] = Decode::decode(space_dump.tuples.as_ref()).unwrap(); assert_eq!(replicaset, r); } - ClusterwideSpace::Migration => { + s if s == &*ClusterwideSpace::Migration => { let [migration]: [(i32, String); 1] = Decode::decode(space_dump.tuples.as_ref()).unwrap(); assert_eq!(migration, (1, "drop table BANK_ACCOUNTS".to_owned())); } - ClusterwideSpace::Space => { + s if s == &*ClusterwideSpace::Space => { let []: [(); 0] = Decode::decode(space_dump.tuples.as_ref()).unwrap(); } - ClusterwideSpace::Index => { + s if s == &*ClusterwideSpace::Index => { let []: [(); 0] = Decode::decode(space_dump.tuples.as_ref()).unwrap(); } + + _ => { + unreachable!(); + } } } } @@ -1821,19 +1887,19 @@ mod tests { }; let tuples = [&i].to_tuple_buffer().unwrap(); data.space_dumps.push(SpaceDump { - space: ClusterwideSpace::Instance, + space: ClusterwideSpace::Instance.into(), tuples, }); let tuples = [(1, "google.com"), (2, "ya.ru")].to_tuple_buffer().unwrap(); data.space_dumps.push(SpaceDump { - space: ClusterwideSpace::Address, + space: ClusterwideSpace::Address.into(), tuples, }); let tuples = [("foo", "bar")].to_tuple_buffer().unwrap(); data.space_dumps.push(SpaceDump { - space: ClusterwideSpace::Property, + space: ClusterwideSpace::Property.into(), tuples, }); @@ -1846,7 +1912,7 @@ mod tests { }; let tuples = [&r].to_tuple_buffer().unwrap(); data.space_dumps.push(SpaceDump { - space: ClusterwideSpace::Replicaset, + space: ClusterwideSpace::Replicaset.into(), tuples, }); @@ -1856,7 +1922,7 @@ mod tests { }; let tuples = [&m].to_tuple_buffer().unwrap(); data.space_dumps.push(SpaceDump { - space: ClusterwideSpace::Migration, + space: ClusterwideSpace::Migration.into(), tuples, }); diff --git a/src/traft/node.rs b/src/traft/node.rs index 94ce364effe26876b1cb102bfa4dc9ae90afb102..2c8bf98c12f9c8ede9ec0ac07a1f8982e4ba0f34 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -15,7 +15,7 @@ use crate::schema::ddl_abort_on_master; use crate::schema::{Distribution, IndexDef, SpaceDef}; use crate::storage::pico_schema_version; use crate::storage::ToEntryIter as _; -use crate::storage::{Clusterwide, ClusterwideSpace, ClusterwideSpaceIndex, PropertyName}; +use crate::storage::{Clusterwide, ClusterwideSpace, PropertyName}; use crate::stringify_cfunc; use crate::tlog; use crate::traft; @@ -115,8 +115,10 @@ impl Status { } } -type StorageWatchers = HashMap<ClusterwideSpaceIndex, watch::Sender<()>>; -type StorageChanges = HashSet<ClusterwideSpaceIndex>; +/// Key is a cluster-wide space name. +type StorageWatchers = HashMap<String, watch::Sender<()>>; +/// Key is a cluster-wide space name. +type StorageChanges = HashSet<String>; /// The heart of `traft` module - the Node. pub struct Node { @@ -337,15 +339,10 @@ impl Node { /// You can also pass a [`ClusterwideSpace`] in which case the space's /// primary index will be used. #[inline(always)] - pub fn storage_watcher(&self, index: impl Into<ClusterwideSpaceIndex>) -> watch::Receiver<()> { - self.storage_watcher_impl(index.into()) - } - - /// A non generic version for optimization. - fn storage_watcher_impl(&self, index: ClusterwideSpaceIndex) -> watch::Receiver<()> { + pub fn storage_watcher(&self, space: impl Into<String>) -> watch::Receiver<()> { use std::collections::hash_map::Entry; let mut watchers = self.watchers.lock(); - match watchers.entry(index) { + match watchers.entry(space.into()) { Entry::Vacant(entry) => { let (tx, rx) = watch::channel(()); entry.insert(tx); @@ -755,13 +752,12 @@ impl NodeImpl { } } Op::Dml(op) => { - if matches!( - op.space(), - ClusterwideSpace::Property | ClusterwideSpace::Replicaset - ) { + let space = op.space(); + if space == &*ClusterwideSpace::Property || space == &*ClusterwideSpace::Replicaset + { *wake_governor = true; } - storage_changes.insert(op.index()); + storage_changes.insert(space.into()); } Op::DdlPrepare { .. } => { *wake_governor = true; @@ -781,11 +777,11 @@ impl NodeImpl { result = instance as _; } Op::Dml(op) => { - let res = match op { - Dml::Insert { space, tuple } => space.insert(&tuple).map(Some), - Dml::Replace { space, tuple } => space.replace(&tuple).map(Some), - Dml::Update { space, key, ops } => space.primary_index().update(&key, &ops), - Dml::Delete { space, key } => space.primary_index().delete(&key), + let res = match &op { + Dml::Insert { space, tuple } => self.storage.insert(space, tuple).map(Some), + Dml::Replace { space, tuple } => self.storage.replace(space, tuple).map(Some), + Dml::Update { space, key, ops } => self.storage.update(space, key, ops), + Dml::Delete { space, key } => self.storage.delete(space, key), }; result = Box::new(res) as _; } diff --git a/src/traft/op.rs b/src/traft/op.rs index 699ca904687d79fdbed316eb3740c963925d99f0..68cac6b3fd88e0106efe2c3ce00158f652ff0f3b 100644 --- a/src/traft/op.rs +++ b/src/traft/op.rs @@ -1,6 +1,6 @@ use crate::instance::Instance; use crate::schema::Distribution; -use crate::storage::{Clusterwide, ClusterwideSpace, ClusterwideSpaceIndex}; +use crate::storage::Clusterwide; use ::tarantool::index::{IndexId, Part}; use ::tarantool::space::{Field, SpaceId}; use ::tarantool::tlua; @@ -192,17 +192,17 @@ impl From<PersistInstance> for Op { #[serde(tag = "op_kind")] pub enum Dml { Insert { - space: ClusterwideSpace, + space: String, #[serde(with = "serde_bytes")] tuple: TupleBuffer, }, Replace { - space: ClusterwideSpace, + space: String, #[serde(with = "serde_bytes")] tuple: TupleBuffer, }, Update { - space: ClusterwideSpace, + space: String, /// Key in primary index #[serde(with = "serde_bytes")] key: TupleBuffer, @@ -210,7 +210,7 @@ pub enum Dml { ops: Vec<TupleBuffer>, }, Delete { - space: ClusterwideSpace, + space: String, /// Key in primary index #[serde(with = "serde_bytes")] key: TupleBuffer, @@ -243,9 +243,9 @@ impl From<Dml> for Op { impl Dml { /// Serializes `tuple` and returns an [`Dml::Insert`] in case of success. #[inline(always)] - pub fn insert(space: ClusterwideSpace, tuple: &impl ToTupleBuffer) -> tarantool::Result<Self> { + pub fn insert(space: impl Into<String>, tuple: &impl ToTupleBuffer) -> tarantool::Result<Self> { let res = Self::Insert { - space, + space: space.into(), tuple: tuple.to_tuple_buffer()?, }; Ok(res) @@ -253,9 +253,12 @@ impl Dml { /// Serializes `tuple` and returns an [`Dml::Replace`] in case of success. #[inline(always)] - pub fn replace(space: ClusterwideSpace, tuple: &impl ToTupleBuffer) -> tarantool::Result<Self> { + pub fn replace( + space: impl Into<String>, + tuple: &impl ToTupleBuffer, + ) -> tarantool::Result<Self> { let res = Self::Replace { - space, + space: space.into(), tuple: tuple.to_tuple_buffer()?, }; Ok(res) @@ -264,7 +267,7 @@ impl Dml { /// Serializes `key` and returns an [`Dml::Update`] in case of success. #[inline(always)] pub fn update( - space: impl Into<ClusterwideSpace>, + space: impl Into<String>, key: &impl ToTupleBuffer, ops: impl Into<Vec<TupleBuffer>>, ) -> tarantool::Result<Self> { @@ -278,10 +281,7 @@ impl Dml { /// Serializes `key` and returns an [`Dml::Delete`] in case of success. #[inline(always)] - pub fn delete( - space: impl Into<ClusterwideSpace>, - key: &impl ToTupleBuffer, - ) -> tarantool::Result<Self> { + pub fn delete(space: impl Into<String>, key: &impl ToTupleBuffer) -> tarantool::Result<Self> { let res = Self::Delete { space: space.into(), key: key.to_tuple_buffer()?, @@ -290,22 +290,12 @@ impl Dml { } #[rustfmt::skip] - pub fn space(&self) -> ClusterwideSpace { - match &self { - Self::Insert { space, .. } => *space, - Self::Replace { space, .. } => *space, - Self::Update { space, .. } => *space, - Self::Delete { space, .. } => *space, - } - } - - #[rustfmt::skip] - pub fn index(&self) -> ClusterwideSpaceIndex { + pub fn space(&self) -> &str { match &self { - Self::Insert { space, .. } => (*space).into(), - Self::Replace { space, .. } => (*space).into(), - Self::Update { space, .. } => (*space).into(), - Self::Delete { space, .. } => (*space).into(), + Self::Insert { space, .. } => space, + Self::Replace { space, .. } => space, + Self::Update { space, .. } => space, + Self::Delete { space, .. } => space, } } @@ -362,7 +352,7 @@ impl Dml { /// `pico.cas`. #[derive(Clone, Debug, PartialEq, Eq, tlua::LuaRead)] pub struct DmlInLua { - pub space: ClusterwideSpace, + pub space: String, pub kind: DmlKind, pub tuple: Option<TupleBuffer>, pub key: Option<TupleBuffer>, diff --git a/src/traft/rpc/cas.rs b/src/traft/rpc/cas.rs index 1a9407b104d4383a9ae6b455556a0d102275af34..cffc9e811c4941acd869307faff534248c668028 100644 --- a/src/traft/rpc/cas.rs +++ b/src/traft/rpc/cas.rs @@ -273,32 +273,32 @@ impl Predicate { let Some(space) = space(entry_op) else { continue }; - let index = space.primary_index(); - if space.as_str() != range.space { + // TODO: check `space` exists + if space != range.space { continue; } match entry_op { Op::Dml(Dml::Update { key, .. } | Dml::Delete { key, .. }) => { let key = Tuple::new(key)?; - let key_def = index.key_def_for_key()?; - check_bounds(key_def, &key, range)?; + let key_def = storage.key_def_for_key(space, 0)?; + check_bounds(&key_def, &key, range)?; } Op::Dml(Dml::Insert { tuple, .. } | Dml::Replace { tuple, .. }) => { let tuple = Tuple::new(tuple)?; - let key_def = index.key_def()?; - check_bounds(key_def, &tuple, range)?; + let key_def = storage.key_def(space, 0)?; + check_bounds(&key_def, &tuple, range)?; } Op::DdlPrepare { .. } | Op::DdlCommit | Op::DdlAbort => { - let key_def = index.key_def_for_key()?; + let key_def = storage.key_def_for_key(space, 0)?; for key in ddl_keys.iter() { - check_bounds(key_def, key, range)?; + check_bounds(&key_def, key, range)?; } } Op::PersistInstance(op) => { let key = Tuple::new(&(&op.0.instance_id,))?; - let key_def = index.key_def_for_key()?; - check_bounds(key_def, &key, range)?; + let key_def = storage.key_def_for_key(space, 0)?; + check_bounds(&key_def, &key, range)?; } Op::Nop => (), }; @@ -382,11 +382,13 @@ pub enum Bound { } /// Get space that the operation touches. -fn space(op: &Op) -> Option<ClusterwideSpace> { +fn space(op: &Op) -> Option<&str> { match op { Op::Dml(dml) => Some(dml.space()), - Op::DdlPrepare { .. } | Op::DdlCommit | Op::DdlAbort => Some(ClusterwideSpace::Property), - Op::PersistInstance(_) => Some(ClusterwideSpace::Instance), + Op::DdlPrepare { .. } | Op::DdlCommit | Op::DdlAbort => { + Some(ClusterwideSpace::Property.into()) + } + Op::PersistInstance(_) => Some(ClusterwideSpace::Instance.into()), Op::Nop => None, } } @@ -569,16 +571,22 @@ mod tests { let space = ClusterwideSpace::Space; let ops = &[ Dml::Insert { - space, + space: space.into(), tuple: tuple.clone(), }, - Dml::Replace { space, tuple }, + Dml::Replace { + space: space.into(), + tuple, + }, Dml::Update { - space, + space: space.into(), key: key.clone(), ops: vec![], }, - Dml::Delete { space, key }, + Dml::Delete { + space: space.into(), + key, + }, ]; let space = ClusterwideSpace::Space.to_string();