From f4d66cabdcec9ed3e9284709509f847567d3a8ec Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Fri, 6 Dec 2024 20:04:05 +0300 Subject: [PATCH] refactor: simplify code in resolve_rpc_target --- src/plugin/rpc/client.rs | 284 ++++++++++++++++----------------------- src/vshard.rs | 32 ++++- 2 files changed, 145 insertions(+), 171 deletions(-) diff --git a/src/plugin/rpc/client.rs b/src/plugin/rpc/client.rs index e51dc01df1..ccaaab1784 100644 --- a/src/plugin/rpc/client.rs +++ b/src/plugin/rpc/client.rs @@ -189,110 +189,126 @@ pub(crate) fn encode_request_arguments( Ok(()) } -fn get_replicaset_uuid_by_bucket_id( - tier_name: &str, - bucket_id: u64, - node: &Node, -) -> Result<String, Error> { - let res = vshard::get_replicaset_uuid_by_bucket_id(tier_name, bucket_id); - if res.is_err() && node.storage.tiers.by_name(tier_name)?.is_none() { - return Err(Error::NoSuchTier(tier_name.into())); - } - - res -} - -fn get_instance_name_of_master_of_replicaset( - tier_name: &str, - bucket_id: u64, - node: &Node, -) -> Result<InstanceName, Error> { - let replicaset_uuid = get_replicaset_uuid_by_bucket_id(tier_name, bucket_id, node)?; - let tuple = node.storage.replicasets.by_uuid_raw(&replicaset_uuid)?; - let Some(master_name) = tuple - .field(Replicaset::FIELD_TARGET_MASTER_NAME) - .map_err(IntoBoxError::into_box_error)? - else { - #[rustfmt::skip] - return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'target_master_name' field in _pico_replicaset tuple").into()); - }; - - Ok(master_name) -} - fn resolve_rpc_target( - ident: &PluginIdentifier, + plugin: &PluginIdentifier, service: &str, target: &FfiSafeRpcTargetSpecifier, node: &Node, ) -> Result<InstanceName, Error> { use FfiSafeRpcTargetSpecifier as Target; - let mut instance_name = None; - let mut replicaset_tuple = None; + let mut to_master_chosen = false; + let mut tier_and_bucket_id = None; + let mut by_replicaset_name = None; + let mut tier_name_owned; match target { - Target::InstanceName(iid) => { + Target::InstanceName(name) => { + // + // Request to a specific instance, single candidate + // // SAFETY: it's required that argument pointers are valid for the lifetime of this function's call - instance_name = Some(InstanceName::from(unsafe { iid.as_str() })); + let instance_name = InstanceName::from(unsafe { name.as_str() }); + + // A single instance was chosen + check_route_to_instance(node, plugin, service, &instance_name)?; + return Ok(instance_name); } &Target::Replicaset { replicaset_name, - to_master: true, + to_master, } => { // SAFETY: it's required that argument pointers are valid for the lifetime of this function's call - let replicaset_name = unsafe { replicaset_name.as_str() }; - let tuple = node.storage.replicasets.get_raw(replicaset_name)?; - let Some(master_name) = tuple - .field(Replicaset::FIELD_TARGET_MASTER_NAME) - .map_err(IntoBoxError::into_box_error)? - else { - #[rustfmt::skip] - return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'target_master_name' field in _pico_replicaset tuple").into()); - }; - instance_name = Some(master_name); - replicaset_tuple = Some(tuple); + let name = unsafe { replicaset_name.as_str() }; + to_master_chosen = to_master; + by_replicaset_name = Some(name); } - &Target::BucketId { bucket_id, - to_master: true, + to_master, } => { - let current_instance_tier = node + tier_name_owned = node .raft_storage .tier()? .expect("storage for instance should exists"); - - let master_name = - get_instance_name_of_master_of_replicaset(¤t_instance_tier, bucket_id, node)?; - instance_name = Some(master_name); + let tier = &*tier_name_owned; + to_master_chosen = to_master; + tier_and_bucket_id = Some((tier, bucket_id)); } - &Target::TierAndBucketId { - to_master: true, + to_master, tier, bucket_id, } => { // SAFETY: it's required that argument pointers are valid for the lifetime of this function's call let tier = unsafe { tier.as_str() }; - let master_name = get_instance_name_of_master_of_replicaset(tier, bucket_id, node)?; - instance_name = Some(master_name); + to_master_chosen = to_master; + tier_and_bucket_id = Some((tier, bucket_id)); } + Target::Any => {} + } - // These cases are handled below - #[rustfmt::skip] - Target::BucketId { to_master: false, .. } - | Target::TierAndBucketId { to_master: false, .. } - | Target::Replicaset { to_master: false, .. } - | Target::Any => {} + let mut by_bucket_id = None; + let mut tier_and_replicaset_uuid = None; + let mut replicaset_tuple = None; + let mut replicaset_uuid_owned; + if let Some((tier, bucket_id)) = tier_and_bucket_id { + replicaset_uuid_owned = vshard::get_replicaset_uuid_by_bucket_id(tier, bucket_id)?; + let uuid = &*replicaset_uuid_owned; + by_bucket_id = Some((tier, bucket_id, uuid)); + tier_and_replicaset_uuid = Some((tier, uuid)); } - if let Some(instance_name) = instance_name { + // + // Request to replicaset master, single candidate + // + if to_master_chosen { + if let Some(replicaset_name) = by_replicaset_name { + let tuple = node.storage.replicasets.get_raw(replicaset_name)?; + replicaset_tuple = Some(tuple); + } + + if let Some((_, _, replicaset_uuid)) = by_bucket_id { + let tuple = node.storage.replicasets.by_uuid_raw(replicaset_uuid)?; + replicaset_tuple = Some(tuple); + } + + let tuple = replicaset_tuple.expect("set in one of ifs above"); + let Some(master_name) = tuple + .field(Replicaset::FIELD_TARGET_MASTER_NAME) + .map_err(IntoBoxError::into_box_error)? + else { + #[rustfmt::skip] + return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'target_master_name' field in _pico_replicaset tuple").into()); + }; + // A single instance was chosen - check_route_to_instance(node, ident, service, &instance_name)?; - return Ok(instance_name); - } else { - // Need to pick an instance from a group, fallthrough + check_route_to_instance(node, plugin, service, &master_name)?; + return Ok(master_name); + } + + if let Some(replicaset_name) = by_replicaset_name { + let tuple = node.storage.replicasets.get_raw(replicaset_name)?; + let Some(found_replicaset_uuid) = tuple + .field(Replicaset::FIELD_REPLICASET_UUID) + .map_err(IntoBoxError::into_box_error)? + else { + #[rustfmt::skip] + return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'replicaset_uuid' field in _pico_replicaset tuple").into()); + }; + replicaset_uuid_owned = found_replicaset_uuid; + + let Some(found_tier) = tuple + .field::<'_, String>(Replicaset::FIELD_TIER) + .map_err(IntoBoxError::into_box_error)? + else { + #[rustfmt::skip] + return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'tier' field in _pico_replicaset tuple").into()); + }; + tier_name_owned = found_tier; + + replicaset_tuple = Some(tuple); + tier_and_replicaset_uuid = Some((&tier_name_owned, &replicaset_uuid_owned)); } let my_instance_name = node @@ -300,99 +316,31 @@ fn resolve_rpc_target( .instance_name()? .expect("should be persisted at this point"); + // Get list of all possible targets let mut all_instances_with_service = node .storage .service_route_table - .get_available_instances(ident, service)?; + .get_available_instances(plugin, service)?; + // Get list of all possible targets filter_instances_by_state(node, &mut all_instances_with_service)?; #[rustfmt::skip] if all_instances_with_service.is_empty() { - if node.storage.services.get(ident, service)?.is_none() { + if node.storage.services.get(plugin, service)?.is_none() { return Err(BoxError::new(ErrorCode::NoSuchService, "service '{plugin}.{service}' not found").into()); } else { - return Err(BoxError::new(ErrorCode::ServiceNotStarted, format!("service '{ident}.{service}' is not started on any instance")).into()); + return Err(BoxError::new(ErrorCode::ServiceNotStarted, format!("service '{plugin}.{service}' is not started on any instance")).into()); } }; - let mut tier_and_replicaset_uuid = None; - - match target { - #[rustfmt::skip] - Target::InstanceName { .. } - | Target::Replicaset { to_master: true, .. } - | Target::TierAndBucketId { to_master: true, .. } - | Target::BucketId { to_master: true, .. } => unreachable!("handled above"), - - &Target::Replicaset { - replicaset_name, - to_master: false, - } => { - // SAFETY: it's required that argument pointers are valid for the lifetime of this function's call - let replicaset_name = unsafe { replicaset_name.as_str() }; - let tuple = node.storage.replicasets.get_raw(replicaset_name)?; - let Some(found_replicaset_uuid) = tuple - .field(Replicaset::FIELD_REPLICASET_UUID) - .map_err(IntoBoxError::into_box_error)? - else { - #[rustfmt::skip] - return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'replicaset_uuid' field in _pico_replicaset tuple").into()); - }; - - let Some(found_tier) = tuple - .field::<'_, String>(Replicaset::FIELD_TIER) - .map_err(IntoBoxError::into_box_error)? - else { - #[rustfmt::skip] - return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'tier' field in _pico_replicaset tuple").into()); - }; - - tier_and_replicaset_uuid = Some((found_tier, found_replicaset_uuid)); - replicaset_tuple = Some(tuple); - } - - &Target::BucketId { - bucket_id, - to_master: false, - } => { - // SAFETY: it's required that argument pointers are valid for the lifetime of this function's call - let tier = node - .raft_storage - .tier()? - .expect("storage for instance should exists"); - - let replicaset_uuid = get_replicaset_uuid_by_bucket_id(&tier, bucket_id, node)?; - tier_and_replicaset_uuid = Some((tier, replicaset_uuid)); - } - - &Target::TierAndBucketId { - tier, - bucket_id, - to_master: false, - } => { - // SAFETY: it's required that argument pointers are valid for the lifetime of this function's call - let tier = unsafe { tier.as_str().to_string() }; - let replicaset_uuid = get_replicaset_uuid_by_bucket_id(&tier, bucket_id, node)?; - tier_and_replicaset_uuid = Some((tier, replicaset_uuid)); - } - - &Target::Any => {} - } - + // + // Request to any replica in replicaset, multiple candidates + // if let Some((tier, replicaset_uuid)) = tier_and_replicaset_uuid { // Need to pick a replica from given replicaset - let replicas = match vshard::get_replicaset_priority_list(&tier, &replicaset_uuid) { - Ok(replicas) => replicas, - Err(err) => { - if node.storage.tiers.by_name(&tier)?.is_none() { - return Err(Error::NoSuchTier(tier)); - } - - return Err(err); - } - }; + let replicas = vshard::get_replicaset_priority_list(tier, replicaset_uuid)?; let mut skipped_self = false; @@ -414,24 +362,26 @@ fn resolve_rpc_target( return Ok(my_instance_name); } - check_replicaset_is_not_expelled(node, &replicaset_uuid, replicaset_tuple)?; + check_replicaset_is_not_expelled(node, replicaset_uuid, replicaset_tuple)?; #[rustfmt::skip] - return Err(BoxError::new(ErrorCode::ServiceNotAvailable, format!("no {replicaset_uuid} replicas are available for service {ident}.{service}")).into()); - } else { - // Need to pick any instance with the given plugin.service - let mut candidates = all_instances_with_service; - - // TODO: find a better strategy then just the random one - let random_index = rand::random::<usize>() % candidates.len(); - let mut instance_name = std::mem::take(&mut candidates[random_index]); - if instance_name == my_instance_name && candidates.len() > 1 { - // Prefer someone else instead of self - let index = (random_index + 1) % candidates.len(); - instance_name = std::mem::take(&mut candidates[index]); - } - return Ok(instance_name); + return Err(BoxError::new(ErrorCode::ServiceNotAvailable, format!("no {replicaset_uuid} replicas are available for service {plugin}.{service}")).into()); + } + + // + // Request to any instance with given service, multiple candidates + // + let mut candidates = all_instances_with_service; + + // TODO: find a better strategy then just the random one + let random_index = rand::random::<usize>() % candidates.len(); + let mut instance_name = std::mem::take(&mut candidates[random_index]); + if instance_name == my_instance_name && candidates.len() > 1 { + // Prefer someone else instead of self + let index = (random_index + 1) % candidates.len(); + instance_name = std::mem::take(&mut candidates[index]); } + return Ok(instance_name); } fn filter_instances_by_state( @@ -454,7 +404,7 @@ fn filter_instances_by_state( fn check_route_to_instance( node: &Node, - ident: &PluginIdentifier, + plugin: &PluginIdentifier, service: &str, instance_name: &InstanceName, ) -> Result<(), Error> { @@ -470,13 +420,13 @@ fn check_route_to_instance( } let res = node.storage.service_route_table.get_raw(&ServiceRouteKey { instance_name, - plugin_name: &ident.name, - plugin_version: &ident.version, + plugin_name: &plugin.name, + plugin_version: &plugin.version, service_name: service, })?; #[rustfmt::skip] let Some(tuple) = res else { - return Err(BoxError::new(ErrorCode::ServiceNotStarted, format!("service '{ident}.{service}' is not running on {instance_name}")).into()); + return Err(BoxError::new(ErrorCode::ServiceNotStarted, format!("service '{plugin}.{service}' is not running on {instance_name}")).into()); }; let res = tuple .field(ServiceRouteItem::FIELD_POISON) @@ -487,7 +437,7 @@ fn check_route_to_instance( }; if is_poisoned { #[rustfmt::skip] - return Err(BoxError::new(ErrorCode::ServicePoisoned, format!("service '{ident}.{service}' is poisoned on {instance_name}")).into()); + return Err(BoxError::new(ErrorCode::ServicePoisoned, format!("service '{plugin}.{service}' is poisoned on {instance_name}")).into()); } Ok(()) } diff --git a/src/vshard.rs b/src/vshard.rs index d2cd56f9fe..3511a2294c 100644 --- a/src/vshard.rs +++ b/src/vshard.rs @@ -10,7 +10,9 @@ use crate::storage::Clusterwide; use crate::storage::ToEntryIter as _; use crate::storage::TABLE_ID_BUCKET; use crate::traft::error::Error; +use crate::traft::node; use crate::traft::RaftId; +use crate::traft::Result; use std::collections::HashMap; use tarantool::space::SpaceId; use tarantool::tlua; @@ -25,8 +27,11 @@ pub fn get_replicaset_priority_list( .ok_or_else(|| Error::other("pico lua module disappeared"))?; let func: tlua::LuaFunction<_> = pico.try_get("_replicaset_priority_list")?; - func.call_with_args((tier, replicaset_uuid)) - .map_err(|e| tlua::LuaError::from(e).into()) + let res = func.call_with_args((tier, replicaset_uuid)); + if res.is_err() { + check_tier_exists(tier)?; + } + res.map_err(|e| tlua::LuaError::from(e).into()) } /// Returns the replicaset uuid and an array of replicas in descending priority @@ -44,8 +49,27 @@ pub fn get_replicaset_uuid_by_bucket_id(tier: &str, bucket_id: u64) -> Result<St .ok_or_else(|| Error::other("pico lua module disappeared"))?; let func: tlua::LuaFunction<_> = pico.try_get("_replicaset_uuid_by_bucket_id")?; - func.call_with_args((tier, bucket_id)) - .map_err(|e| tlua::LuaError::from(e).into()) + let res = func.call_with_args((tier, bucket_id)); + if res.is_err() { + check_tier_exists(tier)?; + } + res.map_err(|e| tlua::LuaError::from(e).into()) +} + +#[inline(always)] +fn check_tier_exists(tier: &str) -> Result<()> { + // Check if tier exists, return corresponding error in that case + if node::global() + .expect("initilized by this point") + .storage + .tiers + .by_name(tier)? + .is_none() + { + return Err(Error::NoSuchTier(tier.into())); + } + + Ok(()) } #[rustfmt::skip] -- GitLab