Skip to content
Snippets Groups Projects
Commit f4d66cab authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor: simplify code in resolve_rpc_target

parent abec4647
No related branches found
No related tags found
1 merge request!1491perf: introduce TopologyCache
......@@ -189,110 +189,126 @@ pub(crate) fn encode_request_arguments(
Ok(())
}
fn get_replicaset_uuid_by_bucket_id(
tier_name: &str,
bucket_id: u64,
node: &Node,
) -> Result<String, Error> {
let res = vshard::get_replicaset_uuid_by_bucket_id(tier_name, bucket_id);
if res.is_err() && node.storage.tiers.by_name(tier_name)?.is_none() {
return Err(Error::NoSuchTier(tier_name.into()));
}
res
}
fn get_instance_name_of_master_of_replicaset(
tier_name: &str,
bucket_id: u64,
node: &Node,
) -> Result<InstanceName, Error> {
let replicaset_uuid = get_replicaset_uuid_by_bucket_id(tier_name, bucket_id, node)?;
let tuple = node.storage.replicasets.by_uuid_raw(&replicaset_uuid)?;
let Some(master_name) = tuple
.field(Replicaset::FIELD_TARGET_MASTER_NAME)
.map_err(IntoBoxError::into_box_error)?
else {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'target_master_name' field in _pico_replicaset tuple").into());
};
Ok(master_name)
}
fn resolve_rpc_target(
ident: &PluginIdentifier,
plugin: &PluginIdentifier,
service: &str,
target: &FfiSafeRpcTargetSpecifier,
node: &Node,
) -> Result<InstanceName, Error> {
use FfiSafeRpcTargetSpecifier as Target;
let mut instance_name = None;
let mut replicaset_tuple = None;
let mut to_master_chosen = false;
let mut tier_and_bucket_id = None;
let mut by_replicaset_name = None;
let mut tier_name_owned;
match target {
Target::InstanceName(iid) => {
Target::InstanceName(name) => {
//
// Request to a specific instance, single candidate
//
// SAFETY: it's required that argument pointers are valid for the lifetime of this function's call
instance_name = Some(InstanceName::from(unsafe { iid.as_str() }));
let instance_name = InstanceName::from(unsafe { name.as_str() });
// A single instance was chosen
check_route_to_instance(node, plugin, service, &instance_name)?;
return Ok(instance_name);
}
&Target::Replicaset {
replicaset_name,
to_master: true,
to_master,
} => {
// SAFETY: it's required that argument pointers are valid for the lifetime of this function's call
let replicaset_name = unsafe { replicaset_name.as_str() };
let tuple = node.storage.replicasets.get_raw(replicaset_name)?;
let Some(master_name) = tuple
.field(Replicaset::FIELD_TARGET_MASTER_NAME)
.map_err(IntoBoxError::into_box_error)?
else {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'target_master_name' field in _pico_replicaset tuple").into());
};
instance_name = Some(master_name);
replicaset_tuple = Some(tuple);
let name = unsafe { replicaset_name.as_str() };
to_master_chosen = to_master;
by_replicaset_name = Some(name);
}
&Target::BucketId {
bucket_id,
to_master: true,
to_master,
} => {
let current_instance_tier = node
tier_name_owned = node
.raft_storage
.tier()?
.expect("storage for instance should exists");
let master_name =
get_instance_name_of_master_of_replicaset(&current_instance_tier, bucket_id, node)?;
instance_name = Some(master_name);
let tier = &*tier_name_owned;
to_master_chosen = to_master;
tier_and_bucket_id = Some((tier, bucket_id));
}
&Target::TierAndBucketId {
to_master: true,
to_master,
tier,
bucket_id,
} => {
// SAFETY: it's required that argument pointers are valid for the lifetime of this function's call
let tier = unsafe { tier.as_str() };
let master_name = get_instance_name_of_master_of_replicaset(tier, bucket_id, node)?;
instance_name = Some(master_name);
to_master_chosen = to_master;
tier_and_bucket_id = Some((tier, bucket_id));
}
Target::Any => {}
}
// These cases are handled below
#[rustfmt::skip]
Target::BucketId { to_master: false, .. }
| Target::TierAndBucketId { to_master: false, .. }
| Target::Replicaset { to_master: false, .. }
| Target::Any => {}
let mut by_bucket_id = None;
let mut tier_and_replicaset_uuid = None;
let mut replicaset_tuple = None;
let mut replicaset_uuid_owned;
if let Some((tier, bucket_id)) = tier_and_bucket_id {
replicaset_uuid_owned = vshard::get_replicaset_uuid_by_bucket_id(tier, bucket_id)?;
let uuid = &*replicaset_uuid_owned;
by_bucket_id = Some((tier, bucket_id, uuid));
tier_and_replicaset_uuid = Some((tier, uuid));
}
if let Some(instance_name) = instance_name {
//
// Request to replicaset master, single candidate
//
if to_master_chosen {
if let Some(replicaset_name) = by_replicaset_name {
let tuple = node.storage.replicasets.get_raw(replicaset_name)?;
replicaset_tuple = Some(tuple);
}
if let Some((_, _, replicaset_uuid)) = by_bucket_id {
let tuple = node.storage.replicasets.by_uuid_raw(replicaset_uuid)?;
replicaset_tuple = Some(tuple);
}
let tuple = replicaset_tuple.expect("set in one of ifs above");
let Some(master_name) = tuple
.field(Replicaset::FIELD_TARGET_MASTER_NAME)
.map_err(IntoBoxError::into_box_error)?
else {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'target_master_name' field in _pico_replicaset tuple").into());
};
// A single instance was chosen
check_route_to_instance(node, ident, service, &instance_name)?;
return Ok(instance_name);
} else {
// Need to pick an instance from a group, fallthrough
check_route_to_instance(node, plugin, service, &master_name)?;
return Ok(master_name);
}
if let Some(replicaset_name) = by_replicaset_name {
let tuple = node.storage.replicasets.get_raw(replicaset_name)?;
let Some(found_replicaset_uuid) = tuple
.field(Replicaset::FIELD_REPLICASET_UUID)
.map_err(IntoBoxError::into_box_error)?
else {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'replicaset_uuid' field in _pico_replicaset tuple").into());
};
replicaset_uuid_owned = found_replicaset_uuid;
let Some(found_tier) = tuple
.field::<'_, String>(Replicaset::FIELD_TIER)
.map_err(IntoBoxError::into_box_error)?
else {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'tier' field in _pico_replicaset tuple").into());
};
tier_name_owned = found_tier;
replicaset_tuple = Some(tuple);
tier_and_replicaset_uuid = Some((&tier_name_owned, &replicaset_uuid_owned));
}
let my_instance_name = node
......@@ -300,99 +316,31 @@ fn resolve_rpc_target(
.instance_name()?
.expect("should be persisted at this point");
// Get list of all possible targets
let mut all_instances_with_service = node
.storage
.service_route_table
.get_available_instances(ident, service)?;
.get_available_instances(plugin, service)?;
// Get list of all possible targets
filter_instances_by_state(node, &mut all_instances_with_service)?;
#[rustfmt::skip]
if all_instances_with_service.is_empty() {
if node.storage.services.get(ident, service)?.is_none() {
if node.storage.services.get(plugin, service)?.is_none() {
return Err(BoxError::new(ErrorCode::NoSuchService, "service '{plugin}.{service}' not found").into());
} else {
return Err(BoxError::new(ErrorCode::ServiceNotStarted, format!("service '{ident}.{service}' is not started on any instance")).into());
return Err(BoxError::new(ErrorCode::ServiceNotStarted, format!("service '{plugin}.{service}' is not started on any instance")).into());
}
};
let mut tier_and_replicaset_uuid = None;
match target {
#[rustfmt::skip]
Target::InstanceName { .. }
| Target::Replicaset { to_master: true, .. }
| Target::TierAndBucketId { to_master: true, .. }
| Target::BucketId { to_master: true, .. } => unreachable!("handled above"),
&Target::Replicaset {
replicaset_name,
to_master: false,
} => {
// SAFETY: it's required that argument pointers are valid for the lifetime of this function's call
let replicaset_name = unsafe { replicaset_name.as_str() };
let tuple = node.storage.replicasets.get_raw(replicaset_name)?;
let Some(found_replicaset_uuid) = tuple
.field(Replicaset::FIELD_REPLICASET_UUID)
.map_err(IntoBoxError::into_box_error)?
else {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'replicaset_uuid' field in _pico_replicaset tuple").into());
};
let Some(found_tier) = tuple
.field::<'_, String>(Replicaset::FIELD_TIER)
.map_err(IntoBoxError::into_box_error)?
else {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'tier' field in _pico_replicaset tuple").into());
};
tier_and_replicaset_uuid = Some((found_tier, found_replicaset_uuid));
replicaset_tuple = Some(tuple);
}
&Target::BucketId {
bucket_id,
to_master: false,
} => {
// SAFETY: it's required that argument pointers are valid for the lifetime of this function's call
let tier = node
.raft_storage
.tier()?
.expect("storage for instance should exists");
let replicaset_uuid = get_replicaset_uuid_by_bucket_id(&tier, bucket_id, node)?;
tier_and_replicaset_uuid = Some((tier, replicaset_uuid));
}
&Target::TierAndBucketId {
tier,
bucket_id,
to_master: false,
} => {
// SAFETY: it's required that argument pointers are valid for the lifetime of this function's call
let tier = unsafe { tier.as_str().to_string() };
let replicaset_uuid = get_replicaset_uuid_by_bucket_id(&tier, bucket_id, node)?;
tier_and_replicaset_uuid = Some((tier, replicaset_uuid));
}
&Target::Any => {}
}
//
// Request to any replica in replicaset, multiple candidates
//
if let Some((tier, replicaset_uuid)) = tier_and_replicaset_uuid {
// Need to pick a replica from given replicaset
let replicas = match vshard::get_replicaset_priority_list(&tier, &replicaset_uuid) {
Ok(replicas) => replicas,
Err(err) => {
if node.storage.tiers.by_name(&tier)?.is_none() {
return Err(Error::NoSuchTier(tier));
}
return Err(err);
}
};
let replicas = vshard::get_replicaset_priority_list(tier, replicaset_uuid)?;
let mut skipped_self = false;
......@@ -414,24 +362,26 @@ fn resolve_rpc_target(
return Ok(my_instance_name);
}
check_replicaset_is_not_expelled(node, &replicaset_uuid, replicaset_tuple)?;
check_replicaset_is_not_expelled(node, replicaset_uuid, replicaset_tuple)?;
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::ServiceNotAvailable, format!("no {replicaset_uuid} replicas are available for service {ident}.{service}")).into());
} else {
// Need to pick any instance with the given plugin.service
let mut candidates = all_instances_with_service;
// TODO: find a better strategy then just the random one
let random_index = rand::random::<usize>() % candidates.len();
let mut instance_name = std::mem::take(&mut candidates[random_index]);
if instance_name == my_instance_name && candidates.len() > 1 {
// Prefer someone else instead of self
let index = (random_index + 1) % candidates.len();
instance_name = std::mem::take(&mut candidates[index]);
}
return Ok(instance_name);
return Err(BoxError::new(ErrorCode::ServiceNotAvailable, format!("no {replicaset_uuid} replicas are available for service {plugin}.{service}")).into());
}
//
// Request to any instance with given service, multiple candidates
//
let mut candidates = all_instances_with_service;
// TODO: find a better strategy then just the random one
let random_index = rand::random::<usize>() % candidates.len();
let mut instance_name = std::mem::take(&mut candidates[random_index]);
if instance_name == my_instance_name && candidates.len() > 1 {
// Prefer someone else instead of self
let index = (random_index + 1) % candidates.len();
instance_name = std::mem::take(&mut candidates[index]);
}
return Ok(instance_name);
}
fn filter_instances_by_state(
......@@ -454,7 +404,7 @@ fn filter_instances_by_state(
fn check_route_to_instance(
node: &Node,
ident: &PluginIdentifier,
plugin: &PluginIdentifier,
service: &str,
instance_name: &InstanceName,
) -> Result<(), Error> {
......@@ -470,13 +420,13 @@ fn check_route_to_instance(
}
let res = node.storage.service_route_table.get_raw(&ServiceRouteKey {
instance_name,
plugin_name: &ident.name,
plugin_version: &ident.version,
plugin_name: &plugin.name,
plugin_version: &plugin.version,
service_name: service,
})?;
#[rustfmt::skip]
let Some(tuple) = res else {
return Err(BoxError::new(ErrorCode::ServiceNotStarted, format!("service '{ident}.{service}' is not running on {instance_name}")).into());
return Err(BoxError::new(ErrorCode::ServiceNotStarted, format!("service '{plugin}.{service}' is not running on {instance_name}")).into());
};
let res = tuple
.field(ServiceRouteItem::FIELD_POISON)
......@@ -487,7 +437,7 @@ fn check_route_to_instance(
};
if is_poisoned {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::ServicePoisoned, format!("service '{ident}.{service}' is poisoned on {instance_name}")).into());
return Err(BoxError::new(ErrorCode::ServicePoisoned, format!("service '{plugin}.{service}' is poisoned on {instance_name}")).into());
}
Ok(())
}
......
......@@ -10,7 +10,9 @@ use crate::storage::Clusterwide;
use crate::storage::ToEntryIter as _;
use crate::storage::TABLE_ID_BUCKET;
use crate::traft::error::Error;
use crate::traft::node;
use crate::traft::RaftId;
use crate::traft::Result;
use std::collections::HashMap;
use tarantool::space::SpaceId;
use tarantool::tlua;
......@@ -25,8 +27,11 @@ pub fn get_replicaset_priority_list(
.ok_or_else(|| Error::other("pico lua module disappeared"))?;
let func: tlua::LuaFunction<_> = pico.try_get("_replicaset_priority_list")?;
func.call_with_args((tier, replicaset_uuid))
.map_err(|e| tlua::LuaError::from(e).into())
let res = func.call_with_args((tier, replicaset_uuid));
if res.is_err() {
check_tier_exists(tier)?;
}
res.map_err(|e| tlua::LuaError::from(e).into())
}
/// Returns the replicaset uuid and an array of replicas in descending priority
......@@ -44,8 +49,27 @@ pub fn get_replicaset_uuid_by_bucket_id(tier: &str, bucket_id: u64) -> Result<St
.ok_or_else(|| Error::other("pico lua module disappeared"))?;
let func: tlua::LuaFunction<_> = pico.try_get("_replicaset_uuid_by_bucket_id")?;
func.call_with_args((tier, bucket_id))
.map_err(|e| tlua::LuaError::from(e).into())
let res = func.call_with_args((tier, bucket_id));
if res.is_err() {
check_tier_exists(tier)?;
}
res.map_err(|e| tlua::LuaError::from(e).into())
}
#[inline(always)]
fn check_tier_exists(tier: &str) -> Result<()> {
// Check if tier exists, return corresponding error in that case
if node::global()
.expect("initilized by this point")
.storage
.tiers
.by_name(tier)?
.is_none()
{
return Err(Error::NoSuchTier(tier.into()));
}
Ok(())
}
#[rustfmt::skip]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment