diff --git a/src/lib.rs b/src/lib.rs index f3d4c21c68241072fa8539d2eca59155f540070f..dd0012b4ff87ee6ce5d496296c603deef34b4b4d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -77,6 +77,7 @@ pub mod tarantool; pub mod tier; pub mod tlog; pub mod to_rmpv_named; +pub mod topology_cache; pub mod traft; pub mod util; pub mod vshard; diff --git a/src/plugin/rpc/client.rs b/src/plugin/rpc/client.rs index ccaaab178424f5675f25087ec01e3975db37b5be..9843db1facbd411f3d71b3d9bfd5169ed54f1213 100644 --- a/src/plugin/rpc/client.rs +++ b/src/plugin/rpc/client.rs @@ -2,11 +2,11 @@ use crate::error_code::ErrorCode; use crate::has_states; use crate::instance::InstanceName; use crate::plugin::{rpc, PluginIdentifier}; -use crate::replicaset::Replicaset; use crate::replicaset::ReplicasetState; -use crate::schema::ServiceRouteItem; -use crate::schema::ServiceRouteKey; use crate::tlog; +use crate::topology_cache::ServiceRouteCheck; +use crate::topology_cache::TopologyCache; +use crate::topology_cache::TopologyCacheRef; use crate::traft::error::Error; use crate::traft::network::ConnectionPool; use crate::traft::node::Node; @@ -17,10 +17,8 @@ use picodata_plugin::util::copy_to_region; use std::time::Duration; use tarantool::error::BoxError; use tarantool::error::Error as TntError; -use tarantool::error::IntoBoxError; use tarantool::error::TarantoolErrorCode; use tarantool::fiber; -use tarantool::tuple::Tuple; use tarantool::tuple::TupleBuffer; use tarantool::tuple::{RawByteBuf, RawBytes}; use tarantool::uuid::Uuid; @@ -56,16 +54,13 @@ pub(crate) fn send_rpc_request( ) -> Result<&'static [u8], Error> { let node = crate::traft::node::global()?; let pool = &node.plugin_manager.pool; + let topology = &node.topology_cache; let timeout = Duration::from_secs_f64(timeout); - let instance_name = resolve_rpc_target(plugin_identity, service, target, node)?; - - let my_instance_name = node - .raft_storage - .instance_name()? - .expect("should be persisted at this point"); + let instance_name = resolve_rpc_target(plugin_identity, service, target, node, topology)?; + let my_instance_name = topology.my_instance_name(); if path.starts_with('.') { return call_builtin_stored_proc(pool, path, input, &instance_name, timeout); } @@ -194,24 +189,24 @@ fn resolve_rpc_target( service: &str, target: &FfiSafeRpcTargetSpecifier, node: &Node, + topology: &TopologyCache, ) -> Result<InstanceName, Error> { use FfiSafeRpcTargetSpecifier as Target; let mut to_master_chosen = false; let mut tier_and_bucket_id = None; let mut by_replicaset_name = None; - let mut tier_name_owned; match target { Target::InstanceName(name) => { // // Request to a specific instance, single candidate // // SAFETY: it's required that argument pointers are valid for the lifetime of this function's call - let instance_name = InstanceName::from(unsafe { name.as_str() }); + let instance_name = unsafe { name.as_str() }; // A single instance was chosen - check_route_to_instance(node, plugin, service, &instance_name)?; - return Ok(instance_name); + check_route_to_instance(topology, plugin, service, instance_name)?; + return Ok(instance_name.into()); } &Target::Replicaset { @@ -227,11 +222,7 @@ fn resolve_rpc_target( bucket_id, to_master, } => { - tier_name_owned = node - .raft_storage - .tier()? - .expect("storage for instance should exists"); - let tier = &*tier_name_owned; + let tier = topology.my_tier_name(); to_master_chosen = to_master; tier_and_bucket_id = Some((tier, bucket_id)); } @@ -250,83 +241,57 @@ fn resolve_rpc_target( let mut by_bucket_id = None; let mut tier_and_replicaset_uuid = None; - let mut replicaset_tuple = None; - let mut replicaset_uuid_owned; + let replicaset_uuid_owned; if let Some((tier, bucket_id)) = tier_and_bucket_id { + // This call must be done here, because this function may yield + // but later we take a volatile reference to TopologyCache which can't be held across yields replicaset_uuid_owned = vshard::get_replicaset_uuid_by_bucket_id(tier, bucket_id)?; let uuid = &*replicaset_uuid_owned; by_bucket_id = Some((tier, bucket_id, uuid)); tier_and_replicaset_uuid = Some((tier, uuid)); } + let topology_ref = topology.get(); + // // Request to replicaset master, single candidate // if to_master_chosen { + let mut target_master_name = None; + if let Some(replicaset_name) = by_replicaset_name { - let tuple = node.storage.replicasets.get_raw(replicaset_name)?; - replicaset_tuple = Some(tuple); + let replicaset = topology_ref.replicaset_by_name(replicaset_name)?; + target_master_name = Some(&replicaset.target_master_name); } if let Some((_, _, replicaset_uuid)) = by_bucket_id { - let tuple = node.storage.replicasets.by_uuid_raw(replicaset_uuid)?; - replicaset_tuple = Some(tuple); + let replicaset = topology_ref.replicaset_by_uuid(replicaset_uuid)?; + target_master_name = Some(&replicaset.target_master_name); } - let tuple = replicaset_tuple.expect("set in one of ifs above"); - let Some(master_name) = tuple - .field(Replicaset::FIELD_TARGET_MASTER_NAME) - .map_err(IntoBoxError::into_box_error)? - else { - #[rustfmt::skip] - return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'target_master_name' field in _pico_replicaset tuple").into()); - }; + let instance_name = target_master_name.expect("set in one of ifs above"); // A single instance was chosen - check_route_to_instance(node, plugin, service, &master_name)?; - return Ok(master_name); + check_route_to_instance(topology, plugin, service, instance_name)?; + return Ok(instance_name.clone()); } if let Some(replicaset_name) = by_replicaset_name { - let tuple = node.storage.replicasets.get_raw(replicaset_name)?; - let Some(found_replicaset_uuid) = tuple - .field(Replicaset::FIELD_REPLICASET_UUID) - .map_err(IntoBoxError::into_box_error)? - else { - #[rustfmt::skip] - return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'replicaset_uuid' field in _pico_replicaset tuple").into()); - }; - replicaset_uuid_owned = found_replicaset_uuid; - - let Some(found_tier) = tuple - .field::<'_, String>(Replicaset::FIELD_TIER) - .map_err(IntoBoxError::into_box_error)? - else { - #[rustfmt::skip] - return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'tier' field in _pico_replicaset tuple").into()); - }; - tier_name_owned = found_tier; - - replicaset_tuple = Some(tuple); - tier_and_replicaset_uuid = Some((&tier_name_owned, &replicaset_uuid_owned)); + let replicaset = topology_ref.replicaset_by_name(replicaset_name)?; + tier_and_replicaset_uuid = Some((&replicaset.tier, &replicaset.uuid)); } - let my_instance_name = node - .raft_storage - .instance_name()? - .expect("should be persisted at this point"); - // Get list of all possible targets - let mut all_instances_with_service = node - .storage - .service_route_table - .get_available_instances(plugin, service)?; + let mut all_instances_with_service: Vec<_> = topology_ref + .instances_running_service(&plugin.name, &plugin.version, service) + .collect(); - // Get list of all possible targets - filter_instances_by_state(node, &mut all_instances_with_service)?; + // Remove non-online instances + filter_instances_by_state(&topology_ref, &mut all_instances_with_service)?; #[rustfmt::skip] if all_instances_with_service.is_empty() { + // TODO: put service definitions into TopologyCache as well if node.storage.services.get(plugin, service)?.is_none() { return Err(BoxError::new(ErrorCode::NoSuchService, "service '{plugin}.{service}' not found").into()); } else { @@ -334,6 +299,8 @@ fn resolve_rpc_target( } }; + let my_instance_name = topology.my_instance_name(); + // // Request to any replica in replicaset, multiple candidates // @@ -347,22 +314,26 @@ fn resolve_rpc_target( // XXX: this shouldn't be a problem if replicasets aren't too big, // but if they are we might want to construct a HashSet from candidates for instance_name in replicas { - if my_instance_name == instance_name { + if my_instance_name == &*instance_name { // Prefer someone else instead of self skipped_self = true; continue; } - if all_instances_with_service.contains(&instance_name) { + if all_instances_with_service.contains(&&*instance_name) { return Ok(instance_name); } } // In case there's no other suitable candidates, fallback to calling self if skipped_self && all_instances_with_service.contains(&my_instance_name) { - return Ok(my_instance_name); + return Ok(my_instance_name.into()); } - check_replicaset_is_not_expelled(node, replicaset_uuid, replicaset_tuple)?; + let replicaset = topology_ref.replicaset_by_uuid(replicaset_uuid)?; + if replicaset.state == ReplicasetState::Expelled { + #[rustfmt::skip] + return Err(BoxError::new(ErrorCode::ReplicasetExpelled, format!("replicaset with id {replicaset_uuid} was expelled")).into()); + } #[rustfmt::skip] return Err(BoxError::new(ErrorCode::ServiceNotAvailable, format!("no {replicaset_uuid} replicas are available for service {plugin}.{service}")).into()); @@ -381,17 +352,17 @@ fn resolve_rpc_target( let index = (random_index + 1) % candidates.len(); instance_name = std::mem::take(&mut candidates[index]); } - return Ok(instance_name); + return Ok(instance_name.into()); } fn filter_instances_by_state( - node: &Node, - instance_names: &mut Vec<InstanceName>, + topology_ref: &TopologyCacheRef, + instance_names: &mut Vec<&str>, ) -> Result<(), Error> { let mut index = 0; while index < instance_names.len() { let name = &instance_names[index]; - let instance = node.storage.instances.get(name)?; + let instance = topology_ref.instance_by_name(name)?; if has_states!(instance, Expelled -> *) || !instance.may_respond() { instance_names.swap_remove(index); } else { @@ -403,63 +374,35 @@ fn filter_instances_by_state( } fn check_route_to_instance( - node: &Node, + topology: &TopologyCache, plugin: &PluginIdentifier, service: &str, - instance_name: &InstanceName, + instance_name: &str, ) -> Result<(), Error> { - let instance = node.storage.instances.get(instance_name)?; + let topology_ref = topology.get(); + let instance = topology_ref.instance_by_name(instance_name)?; if has_states!(instance, Expelled -> *) { #[rustfmt::skip] return Err(BoxError::new(ErrorCode::InstanceExpelled, format!("instance named '{instance_name}' was expelled")).into()); } - if !instance.may_respond() { #[rustfmt::skip] return Err(BoxError::new(ErrorCode::InstanceUnavaliable, format!("instance with instance_name \"{instance_name}\" can't respond due it's state"),).into()); } - let res = node.storage.service_route_table.get_raw(&ServiceRouteKey { - instance_name, - plugin_name: &plugin.name, - plugin_version: &plugin.version, - service_name: service, - })?; - #[rustfmt::skip] - let Some(tuple) = res else { - return Err(BoxError::new(ErrorCode::ServiceNotStarted, format!("service '{plugin}.{service}' is not running on {instance_name}")).into()); - }; - let res = tuple - .field(ServiceRouteItem::FIELD_POISON) - .map_err(IntoBoxError::into_box_error)?; - let Some(is_poisoned) = res else { - #[rustfmt::skip] - return Err(BoxError::new(ErrorCode::StorageCorrupted, "invalid contents has _pico_service_route").into()); - }; - if is_poisoned { - #[rustfmt::skip] - return Err(BoxError::new(ErrorCode::ServicePoisoned, format!("service '{plugin}.{service}' is poisoned on {instance_name}")).into()); - } - Ok(()) -} -fn check_replicaset_is_not_expelled( - node: &Node, - uuid: &str, - maybe_tuple: Option<Tuple>, -) -> Result<(), Error> { - let tuple; - if let Some(t) = maybe_tuple { - tuple = t; - } else { - tuple = node.storage.replicasets.by_uuid_raw(uuid)?; - } + let check = + topology_ref.check_service_route(&plugin.name, &plugin.version, service, instance_name); - let state = tuple.field(Replicaset::FIELD_STATE)?; - let state: ReplicasetState = state.expect("replicaset should always have a state column"); - - if state == ReplicasetState::Expelled { - #[rustfmt::skip] - return Err(BoxError::new(ErrorCode::ReplicasetExpelled, format!("replicaset with id {uuid} was expelled")).into()); + match check { + ServiceRouteCheck::Ok => {} + ServiceRouteCheck::RoutePoisoned => { + #[rustfmt::skip] + return Err(BoxError::new(ErrorCode::ServicePoisoned, format!("service '{plugin}.{service}' is poisoned on {instance_name}")).into()); + } + ServiceRouteCheck::ServiceNotEnabled => { + #[rustfmt::skip] + return Err(BoxError::new(ErrorCode::ServiceNotStarted, format!("service '{plugin}.{service}' is not running on {instance_name}")).into()); + } } Ok(()) diff --git a/src/schema.rs b/src/schema.rs index cf9c6c83a4b0708a95f5364cc68c59c0ea9bb9ac..dd7c70f2cb7c6cd336b8b2decb8235a4b7e18a26 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -665,9 +665,12 @@ pub struct ServiceRouteKey<'a> { /// Instance name. pub instance_name: &'a InstanceName, } - impl<'a> Encode for ServiceRouteKey<'a> {} +//////////////////////////////////////////////////////////////////////////////// +// PluginMigrationRecord +//////////////////////////////////////////////////////////////////////////////// + /// Single record in _pico_plugin_migration system table. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct PluginMigrationRecord { @@ -708,6 +711,10 @@ impl PluginMigrationRecord { } } +//////////////////////////////////////////////////////////////////////////////// +// PluginConfigRecord +//////////////////////////////////////////////////////////////////////////////// + /// Single record in _pico_plugin_config system table. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct PluginConfigRecord { diff --git a/src/storage.rs b/src/storage.rs index a31bec980e39f7b3abf4392f4b6b2d17e5525a69..e5cbb2616a226783a6809e67c81e7e8fb8312937 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -2768,6 +2768,15 @@ impl ServiceRouteTable { } } +impl ToEntryIter<MP_SERDE> for ServiceRouteTable { + type Entry = ServiceRouteItem; + + #[inline(always)] + fn index_iter(&self) -> tarantool::Result<IndexIterator> { + self.space.select(IteratorType::All, &()) + } +} + //////////////////////////////////////////////////////////////////////////////// // PluginMigration //////////////////////////////////////////////////////////////////////////////// diff --git a/src/topology_cache.rs b/src/topology_cache.rs new file mode 100644 index 0000000000000000000000000000000000000000..86f9931e9da08c37f69b52fff0ebed10a63fbc7d --- /dev/null +++ b/src/topology_cache.rs @@ -0,0 +1,729 @@ +#![allow(unused_parens)] +use crate::instance::Instance; +use crate::replicaset::Replicaset; +use crate::schema::ServiceRouteItem; +use crate::storage::Clusterwide; +use crate::storage::ToEntryIter; +use crate::tier::Tier; +#[allow(unused_imports)] +use crate::tlog; +use crate::traft::error::Error; +use crate::traft::error::IdOfInstance; +#[allow(unused_imports)] +use crate::traft::node::NodeImpl; +use crate::traft::RaftId; +use crate::traft::Result; +use std::cell::OnceCell; +use std::collections::HashMap; +use tarantool::fiber::safety::NoYieldsRef; +use tarantool::fiber::safety::NoYieldsRefCell; + +pub type TopologyCacheRef<'a> = NoYieldsRef<'a, TopologyCacheMutable>; + +//////////////////////////////////////////////////////////////////////////////// +// TopologyCache +//////////////////////////////////////////////////////////////////////////////// + +/// Stores deserialized info from system table like _pico_instance, +/// _pico_replicaset, etc. The data in this struct is automatically kept up to +/// date in [`NodeImpl::advance`]: whenever new raft entries or raft snapshots +/// are handled, the data is updated correspondingly. +/// +/// Doesn't implement `Clone` by design, must be wrapped in an `Rc`. +pub struct TopologyCache { + /// Stores topology data which may change during run time. + /// + /// This data is volatile, so must be guarded by `NoYieldsRefCell`. Other + /// data in this struct is immutable, so it's safe to hold references to it + /// across yields. + inner: NoYieldsRefCell<TopologyCacheMutable>, + + /// Raft id is always known and never changes, so it can be cached here. + pub my_raft_id: RaftId, + + /// Instance name never changes on running instance after it's determined, so it can be cached here. + pub my_instance_name: OnceCell<String>, + + /// Instance uuid never changes on running instance after it's determined, so it can be cached here. + pub my_instance_uuid: OnceCell<String>, + + /// Replicaset name never changes on running instance after it's determined, so it can be cached here. + pub my_replicaset_name: OnceCell<String>, + + /// Replicaset uuid never changes on running instance after it's determined, so it can be cached here. + pub my_replicaset_uuid: OnceCell<String>, + + /// Tier name never changes on running instance after it's determined, so it can be cached here. + pub my_tier_name: OnceCell<String>, +} + +impl TopologyCache { + /// Initializes the cache by loading all of the contents from _pico_instance + /// and _pico_replicaset system tables. + pub fn load(storage: &Clusterwide, my_raft_id: RaftId) -> Result<Self> { + let inner = TopologyCacheMutable::load(storage, my_raft_id)?; + + let my_instance_name = OnceCell::new(); + let my_instance_uuid = OnceCell::new(); + let my_replicaset_name = OnceCell::new(); + let my_replicaset_uuid = OnceCell::new(); + let my_tier_name = OnceCell::new(); + if let Some(this_instance) = inner.this_instance.get() { + // If at this point we know this instance's info, we cache it immediately. + // Otherwise this info will be set in `update_instance` bellow as soon as it's known. + my_instance_name + .set(this_instance.name.to_string()) + .expect("was empty"); + my_instance_uuid + .set(this_instance.uuid.clone()) + .expect("was empty"); + my_replicaset_name + .set(this_instance.replicaset_name.to_string()) + .expect("was empty"); + my_replicaset_uuid + .set(this_instance.replicaset_uuid.clone()) + .expect("was empty"); + my_tier_name + .set(this_instance.tier.clone()) + .expect("was empty"); + } + + Ok(Self { + my_raft_id, + my_instance_name, + my_instance_uuid, + my_replicaset_name, + my_replicaset_uuid, + my_tier_name, + inner: NoYieldsRefCell::new(inner), + }) + } + + /// Drop all cached data and load it all again from the storage. + /// + /// This is called from [`NodeImpl::advance`] after receiving a snapshot. + pub fn full_reload(&self, storage: &Clusterwide) -> Result<()> { + let mut inner = self.inner.borrow_mut(); + // Drop the old data and replace it with new loaded from the storage + *inner = TopologyCacheMutable::load(storage, self.my_raft_id)?; + Ok(()) + } + + /// Get access to the volatile topology info, i.e. info which may change + /// during execution. + /// + /// Returns a reference object which **MUST NOT** be held across yields. + /// Use this if you want to avoid redundant copies. + /// + /// Use [`Self::clone_instance_by_name`], [`Self::clone_replicaset_by_uuid`] + /// if you need the copies anyway. + /// + /// Use [`Self::my_instance_name`], [`Self::my_tier_name`], etc. if you need + /// access to the immutable info of current instance. + #[inline(always)] + #[track_caller] + pub fn get(&self) -> NoYieldsRef<TopologyCacheMutable> { + self.inner.borrow() + } + + #[inline(always)] + pub fn clone_this_instance(&self) -> Instance { + self.get() + .this_instance + .get() + .expect("don't call this until it's known") + .clone() + } + + #[inline(always)] + pub fn my_instance_name(&self) -> &str { + self.my_instance_name + .get() + .expect("don't call this until it's known") + } + + #[inline(always)] + pub fn my_instance_uuid(&self) -> &str { + self.my_instance_uuid + .get() + .expect("don't call this until it's known") + } + + #[inline(always)] + pub fn my_replicaset_name(&self) -> &str { + self.my_replicaset_name + .get() + .expect("don't call this until it's known") + } + + #[inline(always)] + pub fn my_replicaset_uuid(&self) -> &str { + self.my_replicaset_uuid + .get() + .expect("don't call this until it's known") + } + + #[inline(always)] + pub fn my_tier_name(&self) -> &str { + self.my_tier_name + .get() + .expect("don't call this until it's known") + } + + #[inline(always)] + pub fn clone_instance_by_name(&self, name: &str) -> Result<Instance> { + self.get().instance_by_name(name).cloned() + } + + #[inline(always)] + pub fn clone_instance_by_uuid(&self, uuid: &str) -> Result<Instance> { + self.get().instance_by_uuid(uuid).cloned() + } + + #[inline(always)] + pub fn clone_replicaset_by_name(&self, name: &str) -> Result<Replicaset> { + self.get().replicaset_by_name(name).cloned() + } + + #[inline(always)] + pub fn clone_replicaset_by_uuid(&self, uuid: &str) -> Result<Replicaset> { + self.get().replicaset_by_uuid(uuid).cloned() + } + + /// Updates the instance record. + /// + /// This function should only be called from [`NodeImpl::handle_dml_entry`]. + /// + /// [`NodeImpl::handle_dml_entry`]: crate::traft::node::NodeImpl::handle_dml_entry + #[inline(always)] + pub(crate) fn update_instance(&self, old: Option<Instance>, new: Option<Instance>) { + if let Some(new) = &new { + if self.my_instance_name.get().is_none() && new.raft_id == self.my_raft_id { + self.my_instance_name + .set(new.name.to_string()) + .expect("was empty"); + self.my_instance_uuid + .set(new.uuid.clone()) + .expect("was empty"); + self.my_replicaset_name + .set(new.replicaset_name.to_string()) + .expect("was empty"); + self.my_replicaset_uuid + .set(new.replicaset_uuid.clone()) + .expect("was empty"); + self.my_tier_name.set(new.tier.clone()).expect("was empty"); + } + } + + self.inner.borrow_mut().update_instance(old, new) + } + + /// Updates the replicaset record. + /// + /// This function should only be called from [`NodeImpl::handle_dml_entry`]. + #[inline(always)] + pub(crate) fn update_replicaset(&self, old: Option<Replicaset>, new: Option<Replicaset>) { + self.inner.borrow_mut().update_replicaset(old, new) + } + + /// Updates the tier record. + /// + /// This function should only be called from [`NodeImpl::handle_dml_entry`]. + #[inline(always)] + pub(crate) fn update_tier(&self, old: Option<Tier>, new: Option<Tier>) { + self.inner.borrow_mut().update_tier(old, new) + } + + /// Updates the service route record. + /// + /// This function should only be called from [`NodeImpl::handle_dml_entry`]. + #[inline(always)] + pub(crate) fn update_service_route( + &self, + old: Option<ServiceRouteItem>, + new: Option<ServiceRouteItem>, + ) { + self.inner.borrow_mut().update_service_route(old, new) + } +} + +//////////////////////////////////////////////////////////////////////////////// +// TopologyCacheMutable +//////////////////////////////////////////////////////////////////////////////// + +/// Stores topology info which may change during execution. Because the data is +/// volatile, access to it from separate fibers must be guarded by a [`NoYieldsRefCell`]. +/// `fiber::Mutex` also works, but may lead to deadlocks and it's better to +/// just never hold such references across yields, which is enforced by NoYieldsRefCell. +#[derive(Default)] +pub struct TopologyCacheMutable { + /// Raft id is always known and never changes. + my_raft_id: RaftId, + + /// Info about the current instance. + /// + /// This may not be known for a short moment when the instance bootstraps a + /// cluster after it initialized the raft node and before it handled the + /// bootstrap entries. + this_instance: OnceCell<Instance>, + + /// Info about the current instance's replicaset. + /// + /// This may not be known for a short moment when the instance joins the + /// cluster after it initialied it's raft node and before it received the + /// raft log entry/snapshot with the replicaset info. After that point this + /// will never be `None` again. + this_replicaset: OnceCell<Replicaset>, + + /// Info about the current instance's tier. + /// + /// It may not be known for a short moment but once it's known it's always known. + this_tier: OnceCell<Tier>, + + /// FIXME: there's space for optimization here. We use `String` as key which + /// requires additional memory allocation + cache misses. This would be + /// improved if we used a `Uuid` type, but there's a problem with that, + /// because we don't use `Uuid` type anywhere else which means we'll need to + /// add conversions from String to Uuid, which cancels out some of the + /// performance gain. We can't use `SmolStr` because it only stores up to + /// 23 bytes in-place, while a stringified `Uuid` is 36 bytes. There's no + /// good reason, why a small string can't store 36 bytes in-place but we'll + /// have to implement our own. Thankfully it's going to be super fricken ez, + /// we just have to do it.. + instances_by_name: HashMap<String, Instance>, + instance_name_by_uuid: HashMap<String, String>, + + /// We store replicasets by uuid because in some places we need bucket_id + /// based routing which only works via vshard at the moment, which only + /// knows about replicaset uuid, so we must optimize the lookup via uuid. + /// For instances there's no such need and we don't have any APIs which + /// operate on instance uuids at the moment, so we optimize for lookup based + /// on instance name. + replicasets_by_uuid: HashMap<String, Replicaset>, + replicaset_uuid_by_name: HashMap<String, String>, + + tiers_by_name: HashMap<String, Tier>, + + /// The meaning of the data is such: + /// ```ignore + /// HashMap<PluginName, HashMap<PluginVersion, HashMap<ServiceName, HashMap<InstanceName, IsPoisoned>>>> + /// ``` + /// + /// XXX: This monstrosity exists, because rust is such a great language. + /// What we want is to have HashMap<(String, String, String, String), T>, + /// but this would mean that when HashMap::get will have to take a + /// &(String, String, String, String), which means we need to construct 4 strings + /// every time we want to read the data. And passing a (&str, &str, &str, &str) + /// will not compile. We need this code to be blazingly fast though, so we + /// do this... + #[allow(clippy::type_complexity)] + service_routes: HashMap<String, HashMap<String, HashMap<String, HashMap<String, bool>>>>, +} + +impl TopologyCacheMutable { + pub fn load(storage: &Clusterwide, my_raft_id: RaftId) -> Result<Self> { + let mut this_instance = None; + let mut this_replicaset = None; + let mut this_tier = None; + + let mut instances_by_name = HashMap::default(); + let mut instance_name_by_uuid = HashMap::default(); + let mut replicasets_by_uuid = HashMap::default(); + let mut replicaset_uuid_by_name = HashMap::default(); + let mut tiers_by_name = HashMap::default(); + let mut service_routes = HashMap::default(); + + let instances = storage.instances.all_instances()?; + for instance in instances { + let instance_name = instance.name.to_string(); + + if instance.raft_id == my_raft_id { + this_instance = Some(instance.clone()); + } + + instance_name_by_uuid.insert(instance.uuid.clone(), instance_name.clone()); + instances_by_name.insert(instance_name, instance); + } + + let replicasets = storage.replicasets.iter()?; + for replicaset in replicasets { + let replicaset_uuid = replicaset.uuid.clone(); + if let Some(instance) = &this_instance { + if replicaset_uuid == instance.replicaset_uuid { + this_replicaset = Some(replicaset.clone()); + } + } + + let replicaset_name = replicaset.name.to_string(); + replicaset_uuid_by_name.insert(replicaset_name, replicaset_uuid.clone()); + replicasets_by_uuid.insert(replicaset_uuid, replicaset); + } + + let tiers = storage.tiers.iter()?; + for tier in tiers { + if let Some(instance) = &this_instance { + if tier.name == instance.tier { + this_tier = Some(tier.clone()); + } + } + + tiers_by_name.insert(tier.name.clone(), tier); + } + + let items = storage.service_route_table.iter()?; + for item in items { + service_routes + .entry(item.plugin_name) + .or_insert_with(HashMap::default) + .entry(item.plugin_version) + .or_insert_with(HashMap::default) + .entry(item.instance_name.into()) + .or_insert_with(HashMap::default) + .insert(item.service_name, item.poison); + } + + Ok(Self { + my_raft_id, + this_instance: once_cell_from_option(this_instance), + this_replicaset: once_cell_from_option(this_replicaset), + this_tier: once_cell_from_option(this_tier), + instances_by_name, + instance_name_by_uuid, + replicasets_by_uuid, + replicaset_uuid_by_name, + tiers_by_name, + service_routes, + }) + } + + pub fn instance_by_name(&self, name: &str) -> Result<&Instance> { + let this_instance = self + .this_instance + .get() + .expect("should be known at this point"); + if this_instance.name == name { + return Ok(this_instance); + } + + let Some(instance) = self.instances_by_name.get(name) else { + return Err(Error::NoSuchInstance(IdOfInstance::Name(name.into()))); + }; + + Ok(instance) + } + + pub fn instance_by_uuid(&self, uuid: &str) -> Result<&Instance> { + let this_instance = self + .this_instance + .get() + .expect("should be known at this point"); + if this_instance.uuid == uuid { + return Ok(this_instance); + } + + let Some(name) = self.instance_name_by_uuid.get(uuid) else { + return Err(Error::NoSuchInstance(IdOfInstance::Uuid(uuid.into()))); + }; + + self.instance_by_name(name) + } + + pub fn replicaset_by_name(&self, name: &str) -> Result<&Replicaset> { + if let Some(this_replicaset) = self.this_replicaset.get() { + if this_replicaset.name == name { + return Ok(this_replicaset); + } + } + + let Some(uuid) = self.replicaset_uuid_by_name.get(name) else { + return Err(Error::NoSuchReplicaset { + name: name.into(), + id_is_uuid: false, + }); + }; + + self.replicaset_by_uuid(uuid) + } + + pub fn replicaset_by_uuid(&self, uuid: &str) -> Result<&Replicaset> { + if let Some(this_replicaset) = self.this_replicaset.get() { + if this_replicaset.uuid == uuid { + return Ok(this_replicaset); + } + } + + let Some(replicaset) = self.replicasets_by_uuid.get(uuid) else { + return Err(Error::NoSuchReplicaset { + name: uuid.into(), + id_is_uuid: true, + }); + }; + + Ok(replicaset) + } + + pub fn tier_by_name(&self, name: &str) -> Result<&Tier> { + if let Some(this_tier) = self.this_tier.get() { + if this_tier.name == name { + return Ok(this_tier); + } + } + + let Some(tier) = self.tiers_by_name.get(name) else { + return Err(Error::NoSuchTier(name.into())); + }; + + Ok(tier) + } + + pub fn check_service_route( + &self, + plugin: &str, + version: &str, + service: &str, + instance_name: &str, + ) -> ServiceRouteCheck { + let Some(is_poisoned) = self + .service_routes + .get(plugin) + .and_then(|m| m.get(version)) + .and_then(|m| m.get(service)) + .and_then(|m| m.get(instance_name)) + else { + return ServiceRouteCheck::ServiceNotEnabled; + }; + + if *is_poisoned { + ServiceRouteCheck::RoutePoisoned + } else { + ServiceRouteCheck::Ok + } + } + + pub fn instances_running_service( + &self, + plugin: &str, + version: &str, + service: &str, + ) -> impl Iterator<Item = &str> { + // Is this code readable? Asking for a friend + self.service_routes + .get(plugin) + .and_then(|m| m.get(version)) + .and_then(|m| m.get(service)) + .map(|m| m.iter()) + .into_iter() + .flatten() + .filter(|(_, &is_poisoned)| !is_poisoned) + .map(|(i, _)| i.as_str()) + } + + fn update_instance(&mut self, old: Option<Instance>, new: Option<Instance>) { + if let Some(new) = &new { + if new.raft_id == self.my_raft_id { + once_cell_replace(&mut self.this_instance, new.clone()); + } + } + + match (old, new) { + // Create new instance + (None, Some(new)) => { + let new_uuid = new.uuid.clone(); + let new_name = new.name.to_string(); + + let old_cached_name = self + .instance_name_by_uuid + .insert(new_uuid, new_name.clone()); + debug_assert!(old_cached_name.is_none()); + + let old_cached = self.instances_by_name.insert(new_name, new); + debug_assert!(old_cached.is_none()); + } + + // Update instance + (Some(old), Some(new)) => { + // XXX: if the primary key changes, this logic must be updated + debug_assert_eq!(old.name, new.name, "instance name is the primary key"); + + if old != new { + let new_uuid = new.uuid.clone(); + let new_name = new.name.to_string(); + + let same_uuid = (old.uuid == new_uuid); + if !same_uuid { + let old_cached_name = self + .instance_name_by_uuid + .insert(new_uuid, new_name.clone()); + debug_assert!(old_cached_name.is_none()); + + // The new instance replaces the old one, so the old + // uuid -> name mapping should be removed + let old_cached = self.instance_name_by_uuid.remove(&old.uuid); + debug_assert!(old_cached.is_some()); + } + + let old_cached = self.instances_by_name.insert(new_name, new); + debug_assert_eq!(old_cached, Some(old)); + } + } + + // Delete instance + (Some(old), None) => { + let old_cached_name = self.instance_name_by_uuid.remove(&old.uuid); + debug_assert_eq!(old_cached_name.as_deref(), Some(&*old.name)); + + let old_cached = self.instances_by_name.remove(&*old.name); + debug_assert_eq!(old_cached, Some(old)); + } + + (None, None) => unreachable!(), + } + } + + fn update_replicaset(&mut self, old: Option<Replicaset>, new: Option<Replicaset>) { + match (&new, self.this_instance.get()) { + (Some(new), Some(this_instance)) if new.name == this_instance.replicaset_name => { + once_cell_replace(&mut self.this_replicaset, new.clone()); + } + _ => {} + } + + match (old, new) { + // Create new replicaset + (None, Some(new)) => { + let new_uuid = new.uuid.clone(); + let new_name = new.name.to_string(); + + let old_cached_uuid = self + .replicaset_uuid_by_name + .insert(new_name, new_uuid.clone()); + debug_assert!(old_cached_uuid.is_none()); + + let old_cached = self.replicasets_by_uuid.insert(new_uuid, new); + debug_assert!(old_cached.is_none()); + } + + // Update replicaset + (Some(old), Some(new)) => { + // XXX: if the primary key changes, this logic must be updated + debug_assert_eq!(old.name, new.name, "replicaset name is the primary key"); + + if old != new { + let new_uuid = new.uuid.clone(); + let new_name = new.name.to_string(); + + let same_uuid = (old.uuid == new_uuid); + if !same_uuid { + let old_cached_uuid = self + .replicaset_uuid_by_name + .insert(new_name, new_uuid.clone()); + debug_assert_eq!(old_cached_uuid.as_ref(), Some(&old.uuid)); + + // The new replicaset replaces the old one, so that one + // should be removed. This is needed because our cache + // stores replicasets by `uuid` (primary key of sorts), + // while in storage the primary key is `name`. Maybe we + // should change the primary key in storage to `uuid` as well + let old_cached = self.replicasets_by_uuid.remove(&old.uuid); + debug_assert!(old_cached.is_some()); + } + + let old_cached = self.replicasets_by_uuid.insert(new_uuid, new); + debug_assert_eq!(old_cached, same_uuid.then_some(old)); + } + } + + // Delete replicaset + (Some(old), None) => { + let old_cached_uuid = self.replicaset_uuid_by_name.remove(&*old.name); + debug_assert_eq!(old_cached_uuid.as_ref(), Some(&old.uuid)); + + let old_cached = self.replicasets_by_uuid.remove(&old.uuid); + debug_assert_eq!(old_cached, Some(old)); + } + + (None, None) => unreachable!(), + } + } + + fn update_tier(&mut self, old: Option<Tier>, new: Option<Tier>) { + if let Some(new) = new { + let new_name = new.name.clone(); + + if let Some(this_instance) = self.this_instance.get() { + if new_name == this_instance.tier { + once_cell_replace(&mut self.this_tier, new.clone()); + } + } + + // Create new tier or update old tier + let old_cached = self.tiers_by_name.insert(new_name, new); + debug_assert_eq!(old_cached, old); + } else if let Some(old) = old { + // Delete tier + let old_cached = self.tiers_by_name.remove(&old.name); + debug_assert_eq!(old_cached.as_ref(), Some(&old)); + } else { + unreachable!() + } + } + + fn update_service_route( + &mut self, + old: Option<ServiceRouteItem>, + new: Option<ServiceRouteItem>, + ) { + if let Some(new) = new { + // Create service route record or update an old one + self.service_routes + .entry(new.plugin_name) + .or_insert_with(HashMap::default) + .entry(new.plugin_version) + .or_insert_with(HashMap::default) + .entry(new.service_name) + .or_insert_with(HashMap::default) + .insert(new.instance_name.into(), new.poison); + } else if let Some(old) = old { + // Delete a service route record + let _: Option<bool> = self + .service_routes + .get_mut(&old.plugin_name) + .and_then(|m| m.get_mut(&old.plugin_version)) + .and_then(|m| m.get_mut(&old.service_name)) + .and_then(|m| m.remove(&*old.instance_name)); + } else { + unreachable!() + } + } +} + +/// Value of this type is returned from [`TopologyCacheMutable::check_service_route`]. +pub enum ServiceRouteCheck { + Ok, + RoutePoisoned, + ServiceNotEnabled, +} + +#[inline(always)] +fn once_cell_from_option<T>(v: Option<T>) -> OnceCell<T> +where + T: std::fmt::Debug, +{ + let res = OnceCell::new(); + if let Some(v) = v { + res.set(v).expect("was empty"); + } + res +} + +#[inline(always)] +fn once_cell_replace<T>(cell: &mut OnceCell<T>, new: T) +where + T: std::fmt::Debug, +{ + if let Some(cell) = cell.get_mut() { + *cell = new; + } else { + cell.set(new).expect("was empty"); + } +} diff --git a/src/traft/node.rs b/src/traft/node.rs index 563e858617d3f2cdbd4aa7d3a9bd54ec94bb66b0..649d8942c0e6ba74501a9571e684b12f0f6c688e 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -36,6 +36,7 @@ use crate::storage::{local_schema_version, set_local_schema_version}; use crate::storage::{Clusterwide, ClusterwideTable, PropertyName}; use crate::system_parameter_name; use crate::tlog; +use crate::topology_cache::TopologyCache; use crate::traft; use crate::traft::error::Error; use crate::traft::network::WorkerOptions; @@ -56,7 +57,6 @@ use ::raft::prelude as raft; use ::raft::storage::Storage as _; use ::raft::Error as RaftError; use ::raft::StateRole as RaftStateRole; -use ::raft::StorageError; use ::raft::INVALID_ID; use ::tarantool::error::BoxError; use ::tarantool::error::TarantoolErrorCode; @@ -153,6 +153,7 @@ pub struct Node { node_impl: Rc<Mutex<NodeImpl>>, pub(crate) storage: Clusterwide, + pub(crate) topology_cache: Rc<TopologyCache>, pub(crate) raft_storage: RaftSpaceAccess, pub(crate) main_loop: MainLoop, pub(crate) governor_loop: governor::Loop, @@ -211,6 +212,7 @@ impl Node { raft_storage.clone(), plugin_manager.clone(), )?; + let topology_cache = node_impl.topology_cache.clone(); let raft_id = node_impl.raft_id(); let status = node_impl.status.subscribe(); @@ -240,6 +242,7 @@ impl Node { ), pool, node_impl, + topology_cache, storage, raft_storage, status, @@ -447,6 +450,7 @@ pub(crate) struct NodeImpl { pub read_state_wakers: HashMap<LogicalClock, oneshot::Sender<RaftIndex>>, joint_state_latch: KVCell<RaftIndex, oneshot::Sender<Result<(), RaftError>>>, storage: Clusterwide, + topology_cache: Rc<TopologyCache>, raft_storage: RaftSpaceAccess, pool: Rc<ConnectionPool>, lc: LogicalClock, @@ -462,15 +466,12 @@ impl NodeImpl { storage: Clusterwide, raft_storage: RaftSpaceAccess, plugin_manager: Rc<PluginManager>, - ) -> Result<Self, RaftError> { - let box_err = |e| StorageError::Other(Box::new(e)); - + ) -> traft::Result<Self> { let raft_id: RaftId = raft_storage - .raft_id() - .map_err(box_err)? + .raft_id()? .expect("raft_id should be set by the time the node is being initialized"); - let applied: RaftIndex = raft_storage.applied().map_err(box_err)?; - let term: RaftTerm = raft_storage.term().map_err(box_err)?; + let applied: RaftIndex = raft_storage.applied()?; + let term: RaftTerm = raft_storage.term()?; let lc = { let gen = raft_storage.gen().unwrap() + 1; raft_storage.persist_gen(gen).unwrap(); @@ -498,11 +499,14 @@ impl NodeImpl { }); let (applied, _) = watch::channel(applied); + let topology_cache = Rc::new(TopologyCache::load(&storage, raft_id)?); + Ok(Self { raw_node, read_state_wakers: Default::default(), joint_state_latch: KVCell::new(), storage, + topology_cache, raft_storage, instance_reachability: pool.instance_reachability.clone(), pool, @@ -789,7 +793,13 @@ impl NodeImpl { // // TODO: merge this into `do_dml` once `box_tuple_extract_key` is fixed. let old = match space { - Ok(s @ (ClusterwideTable::Property | ClusterwideTable::Instance)) => { + Ok( + s @ (ClusterwideTable::Property + | ClusterwideTable::Instance + | ClusterwideTable::Replicaset + | ClusterwideTable::Tier + | ClusterwideTable::ServiceRouteTable), + ) => { let s = space_by_id(s.id()).expect("system space must exist"); match &op { // There may be no previous version for inserts. @@ -828,31 +838,68 @@ impl NodeImpl { return true; }; - let initiator = op.initiator(); - let initiator_def = user_by_id(initiator).expect("user must exist"); - // FIXME: all of this should be done only after the transaction is committed // See <https://git.picodata.io/core/picodata/-/issues/1149> - if let Some(new) = &new { - // Dml::Delete mandates that new tuple is None. - assert!(!matches!(op, Dml::Delete { .. })); + match space { + ClusterwideTable::Instance => { + let old = old + .as_ref() + .map(|x| x.decode().expect("schema upgrade not supported yet")); + + let new = new + .as_ref() + .map(|x| x.decode().expect("format was already verified")); - // Handle insert, replace, update in _pico_instance - if space == ClusterwideTable::Instance { - let old: Option<Instance> = - old.as_ref().map(|x| x.decode().expect("must be Instance")); + // Handle insert, replace, update in _pico_instance + if let Some(new) = &new { + // Dml::Delete mandates that new tuple is None. + assert!(!matches!(op, Dml::Delete { .. })); - let new: Instance = new - .decode() - .expect("tuple already passed format verification"); + let initiator = op.initiator(); + let initiator_def = user_by_id(initiator).expect("user must exist"); - do_audit_logging_for_instance_update(old.as_ref(), &new, &initiator_def); + do_audit_logging_for_instance_update(old.as_ref(), new, &initiator_def); - if has_states!(new, Expelled -> *) && new.raft_id == self.raft_id() { - // cannot exit during a transaction - *expelled = true; + if has_states!(new, Expelled -> *) && new.raft_id == self.raft_id() { + // cannot exit during a transaction + *expelled = true; + } } + + self.topology_cache.update_instance(old, new); + } + + ClusterwideTable::Replicaset => { + let old = old + .as_ref() + .map(|x| x.decode().expect("schema upgrade not supported yet")); + let new = new + .as_ref() + .map(|x| x.decode().expect("format was already verified")); + self.topology_cache.update_replicaset(old, new); } + + ClusterwideTable::Tier => { + let old = old + .as_ref() + .map(|x| x.decode().expect("schema upgrade not supported yet")); + let new = new + .as_ref() + .map(|x| x.decode().expect("format was already verified")); + self.topology_cache.update_tier(old, new); + } + + ClusterwideTable::ServiceRouteTable => { + let old = old + .as_ref() + .map(|x| x.decode().expect("schema upgrade not supported yet")); + let new = new + .as_ref() + .map(|x| x.decode().expect("format was already verified")); + self.topology_cache.update_service_route(old, new); + } + + _ => {} } true @@ -2129,6 +2176,7 @@ impl NodeImpl { if hard_state.is_some() || !entries_to_persist.is_empty() || snapshot_data.is_some() { let mut new_term = None; let mut new_applied = None; + let mut received_snapshot = false; if let Err(e) = transaction(|| -> Result<(), Error> { self.main_loop_status("persisting hard state, entries and/or snapshot"); @@ -2170,6 +2218,7 @@ impl NodeImpl { self.storage .apply_snapshot_data(&snapshot_data, is_master)?; new_applied = Some(meta.index); + received_snapshot = true; } // TODO: As long as the snapshot was sent to us in response to @@ -2201,6 +2250,19 @@ impl NodeImpl { .expect("applied shouldn't ever be borrowed across yields"); } + if received_snapshot { + // Need to reload the whole topology cache. We could be more + // clever about it and only update the records which changed, + // but at the moment this would require doing a full scan and + // comparing each record, which is no better than full reload. + // A better solution would be to store a raft index in each + // tuple of each global table, then we would only need to check + // if the index changed, but we don't have that.. + self.topology_cache + .full_reload(&self.storage) + .expect("schema upgrade not supported yet"); + } + if hard_state.is_some() { crate::error_injection!(exit "EXIT_AFTER_RAFT_PERSISTS_HARD_STATE"); } diff --git a/src/vshard.rs b/src/vshard.rs index 3511a2294c5563a6ec286774385b0d6e461fc850..f049a699f12ee0bc1f7e2d0a8d715a49cbe7bc1f 100644 --- a/src/vshard.rs +++ b/src/vshard.rs @@ -17,10 +17,14 @@ use std::collections::HashMap; use tarantool::space::SpaceId; use tarantool::tlua; +/// This function **never yields** pub fn get_replicaset_priority_list( tier: &str, replicaset_uuid: &str, ) -> Result<Vec<InstanceName>, Error> { + #[cfg(debug_assertions)] + let _guard = tarantool::fiber::safety::NoYieldsGuard::new(); + let lua = tarantool::lua_state(); let pico: tlua::LuaTable<_> = lua .get("pico") @@ -29,13 +33,20 @@ pub fn get_replicaset_priority_list( let func: tlua::LuaFunction<_> = pico.try_get("_replicaset_priority_list")?; let res = func.call_with_args((tier, replicaset_uuid)); if res.is_err() { - check_tier_exists(tier)?; + // Check if tier exists, return corresponding error in that case + node::global() + .expect("initilized by this point") + .topology_cache + .get() + .tier_by_name(tier)?; } res.map_err(|e| tlua::LuaError::from(e).into()) } /// Returns the replicaset uuid and an array of replicas in descending priority /// order. +/// +/// This function **may yield** if vshard needs to update it's bucket mapping. pub fn get_replicaset_uuid_by_bucket_id(tier: &str, bucket_id: u64) -> Result<String, Error> { let max_bucket_id = PicodataConfig::total_bucket_count(); if bucket_id < 1 || bucket_id > max_bucket_id { @@ -51,27 +62,16 @@ pub fn get_replicaset_uuid_by_bucket_id(tier: &str, bucket_id: u64) -> Result<St let func: tlua::LuaFunction<_> = pico.try_get("_replicaset_uuid_by_bucket_id")?; let res = func.call_with_args((tier, bucket_id)); if res.is_err() { - check_tier_exists(tier)?; + // Check if tier exists, return corresponding error in that case + node::global() + .expect("initilized by this point") + .topology_cache + .get() + .tier_by_name(tier)?; } res.map_err(|e| tlua::LuaError::from(e).into()) } -#[inline(always)] -fn check_tier_exists(tier: &str) -> Result<()> { - // Check if tier exists, return corresponding error in that case - if node::global() - .expect("initilized by this point") - .storage - .tiers - .by_name(tier)? - .is_none() - { - return Err(Error::NoSuchTier(tier.into())); - } - - Ok(()) -} - #[rustfmt::skip] #[derive(serde::Serialize, serde::Deserialize)] #[derive(Default, Clone, Debug, PartialEq, tlua::PushInto, tlua::Push, tlua::LuaRead)]