-
Georgy Moshkin authoredGeorgy Moshkin authored
client.rs 15.65 KiB
use crate::error_code::ErrorCode;
use crate::has_states;
use crate::instance::InstanceName;
use crate::plugin::{rpc, PluginIdentifier};
use crate::replicaset::Replicaset;
use crate::replicaset::ReplicasetState;
use crate::schema::ServiceRouteItem;
use crate::schema::ServiceRouteKey;
use crate::tlog;
use crate::traft::error::Error;
use crate::traft::network::ConnectionPool;
use crate::traft::node::Node;
use crate::vshard;
use picodata_plugin::transport::context::ContextFieldId;
use picodata_plugin::transport::rpc::client::FfiSafeRpcTargetSpecifier;
use picodata_plugin::util::copy_to_region;
use std::time::Duration;
use tarantool::error::BoxError;
use tarantool::error::Error as TntError;
use tarantool::error::IntoBoxError;
use tarantool::error::TarantoolErrorCode;
use tarantool::fiber;
use tarantool::tuple::Tuple;
use tarantool::tuple::TupleBuffer;
use tarantool::tuple::{RawByteBuf, RawBytes};
use tarantool::uuid::Uuid;
////////////////////////////////////////////////////////////////////////////////
// rpc out
////////////////////////////////////////////////////////////////////////////////
fn process_rpc_output(mut output: &[u8]) -> Result<&'static [u8], Error> {
let output_len = rmp::decode::read_bin_len(&mut output).map_err(|e| {
BoxError::new(
TarantoolErrorCode::InvalidMsgpack,
format!("expected bin: {e}"),
)
})?;
if output.len() != output_len as usize {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::InvalidMsgpack, format!("this is weird: {output_len} != {}", output.len())).into());
}
let res = copy_to_region(output)?;
return Ok(res);
}
/// Returns data allocated on the region allocator (or statically allocated).
pub(crate) fn send_rpc_request(
plugin_identity: &PluginIdentifier,
service: &str,
target: &FfiSafeRpcTargetSpecifier,
path: &str,
input: &[u8],
timeout: f64,
) -> Result<&'static [u8], Error> {
let node = crate::traft::node::global()?;
let pool = &node.plugin_manager.pool;
let timeout = Duration::from_secs_f64(timeout);
let instance_name = resolve_rpc_target(plugin_identity, service, target, node)?;
let my_instance_name = node
.raft_storage
.instance_name()?
.expect("should be persisted at this point");
if path.starts_with('.') {
return call_builtin_stored_proc(pool, path, input, &instance_name, timeout);
}
let mut buffer = Vec::new();
let request_id = Uuid::random();
encode_request_arguments(
&mut buffer,
path,
input,
&request_id,
&plugin_identity.name,
service,
&plugin_identity.version,
)
.expect("can't fail encoding into an array");
// Safe because buffer contains a msgpack array
let args = unsafe { TupleBuffer::from_vec_unchecked(buffer) };
if instance_name == my_instance_name {
let output = rpc::server::proc_rpc_dispatch_impl(args.as_ref().into())?;
return process_rpc_output(output);
};
crate::error_injection!("RPC_NETWORK_ERROR" => return Err(Error::other("injected error")));
tlog!(Debug, "sending plugin RPC request";
"instance_name" => %instance_name,
"request_id" => %request_id,
"path" => path,
);
let future = pool.call_raw(
&instance_name,
crate::proc_name!(rpc::server::proc_rpc_dispatch),
&args,
Some(timeout),
)?;
// FIXME: remove this extra allocation for RawByteBuf
let output: RawByteBuf = fiber::block_on(future)?;
process_rpc_output(&output)
}
fn call_builtin_stored_proc(
pool: &ConnectionPool,
proc: &str,
input: &[u8],
instance_name: &InstanceName,
timeout: Duration,
) -> Result<&'static [u8], Error> {
// Call a builtin picodata stored procedure
let Some(proc) = crate::rpc::to_static_proc_name(proc) else {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::NoSuchFunction, format!("unknown static stored procedure {proc}")).into());
};
let args = RawBytes::new(input);
let future = pool.call_raw(instance_name, proc, args, Some(timeout))?;
// FIXME: remove this extra allocation for RawByteBuf
let output: RawByteBuf = fiber::block_on(future)?;
copy_to_region(&output).map_err(Into::into)
}
pub(crate) fn encode_request_arguments(
buffer: &mut Vec<u8>,
path: &str,
input: &[u8],
request_id: &Uuid,
plugin: &str,
service: &str,
plugin_version: &str,
) -> Result<(), TntError> {
buffer.reserve(
// array header
1
// str path
+ 5 + path.len()
// bin input
+ 5 + input.len()
// context header
+ 1
// key + uuid
+ 1 + 18
// key + str plugin
+ 1 + 5 + plugin.len()
// key + str service
+ 1 + 5 + service.len()
// key + str plugin_version
+ 1 + 5 + plugin_version.len(),
);
rmp::encode::write_array_len(buffer, 3)?;
// Encode path
rmp::encode::write_str(buffer, path)?;
// Encode input
rmp::encode::write_bin(buffer, input)?;
// Encode context
{
rmp::encode::write_map_len(buffer, 4)?;
// Encode request_id
rmp::encode::write_uint(buffer, ContextFieldId::RequestId as _)?;
rmp_serde::encode::write(buffer, request_id)?;
// Encode service
rmp::encode::write_uint(buffer, ContextFieldId::PluginName as _)?;
rmp_serde::encode::write(buffer, plugin)?;
// Encode service
rmp::encode::write_uint(buffer, ContextFieldId::ServiceName as _)?;
rmp_serde::encode::write(buffer, service)?;
// Encode plugin_version
rmp::encode::write_uint(buffer, ContextFieldId::PluginVersion as _)?;
rmp_serde::encode::write(buffer, plugin_version)?;
}
Ok(())
}
fn resolve_rpc_target(
plugin: &PluginIdentifier,
service: &str,
target: &FfiSafeRpcTargetSpecifier,
node: &Node,
) -> Result<InstanceName, Error> {
use FfiSafeRpcTargetSpecifier as Target;
let mut to_master_chosen = false;
let mut tier_and_bucket_id = None;
let mut by_replicaset_name = None;
let mut tier_name_owned;
match target {
Target::InstanceName(name) => {
//
// Request to a specific instance, single candidate
//
// SAFETY: it's required that argument pointers are valid for the lifetime of this function's call
let instance_name = InstanceName::from(unsafe { name.as_str() });
// A single instance was chosen
check_route_to_instance(node, plugin, service, &instance_name)?;
return Ok(instance_name);
}
&Target::Replicaset {
replicaset_name,
to_master,
} => {
// SAFETY: it's required that argument pointers are valid for the lifetime of this function's call
let name = unsafe { replicaset_name.as_str() };
to_master_chosen = to_master;
by_replicaset_name = Some(name);
}
&Target::BucketId {
bucket_id,
to_master,
} => {
tier_name_owned = node
.raft_storage
.tier()?
.expect("storage for instance should exists");
let tier = &*tier_name_owned;
to_master_chosen = to_master;
tier_and_bucket_id = Some((tier, bucket_id));
}
&Target::TierAndBucketId {
to_master,
tier,
bucket_id,
} => {
// SAFETY: it's required that argument pointers are valid for the lifetime of this function's call
let tier = unsafe { tier.as_str() };
to_master_chosen = to_master;
tier_and_bucket_id = Some((tier, bucket_id));
}
Target::Any => {}
}
let mut by_bucket_id = None;
let mut tier_and_replicaset_uuid = None;
let mut replicaset_tuple = None;
let mut replicaset_uuid_owned;
if let Some((tier, bucket_id)) = tier_and_bucket_id {
replicaset_uuid_owned = vshard::get_replicaset_uuid_by_bucket_id(tier, bucket_id)?;
let uuid = &*replicaset_uuid_owned;
by_bucket_id = Some((tier, bucket_id, uuid));
tier_and_replicaset_uuid = Some((tier, uuid));
}
//
// Request to replicaset master, single candidate
//
if to_master_chosen {
if let Some(replicaset_name) = by_replicaset_name {
let tuple = node.storage.replicasets.get_raw(replicaset_name)?;
replicaset_tuple = Some(tuple);
}
if let Some((_, _, replicaset_uuid)) = by_bucket_id {
let tuple = node.storage.replicasets.by_uuid_raw(replicaset_uuid)?;
replicaset_tuple = Some(tuple);
}
let tuple = replicaset_tuple.expect("set in one of ifs above");
let Some(master_name) = tuple
.field(Replicaset::FIELD_TARGET_MASTER_NAME)
.map_err(IntoBoxError::into_box_error)?
else {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'target_master_name' field in _pico_replicaset tuple").into());
};
// A single instance was chosen
check_route_to_instance(node, plugin, service, &master_name)?;
return Ok(master_name);
}
if let Some(replicaset_name) = by_replicaset_name {
let tuple = node.storage.replicasets.get_raw(replicaset_name)?;
let Some(found_replicaset_uuid) = tuple
.field(Replicaset::FIELD_REPLICASET_UUID)
.map_err(IntoBoxError::into_box_error)?
else {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'replicaset_uuid' field in _pico_replicaset tuple").into());
};
replicaset_uuid_owned = found_replicaset_uuid;
let Some(found_tier) = tuple
.field::<'_, String>(Replicaset::FIELD_TIER)
.map_err(IntoBoxError::into_box_error)?
else {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'tier' field in _pico_replicaset tuple").into());
};
tier_name_owned = found_tier;
replicaset_tuple = Some(tuple);
tier_and_replicaset_uuid = Some((&tier_name_owned, &replicaset_uuid_owned));
}
let my_instance_name = node
.raft_storage
.instance_name()?
.expect("should be persisted at this point");
// Get list of all possible targets
let mut all_instances_with_service = node
.storage
.service_route_table
.get_available_instances(plugin, service)?;
// Get list of all possible targets
filter_instances_by_state(node, &mut all_instances_with_service)?;
#[rustfmt::skip]
if all_instances_with_service.is_empty() {
if node.storage.services.get(plugin, service)?.is_none() {
return Err(BoxError::new(ErrorCode::NoSuchService, "service '{plugin}.{service}' not found").into());
} else {
return Err(BoxError::new(ErrorCode::ServiceNotStarted, format!("service '{plugin}.{service}' is not started on any instance")).into());
}
};
//
// Request to any replica in replicaset, multiple candidates
//
if let Some((tier, replicaset_uuid)) = tier_and_replicaset_uuid {
// Need to pick a replica from given replicaset
let replicas = vshard::get_replicaset_priority_list(tier, replicaset_uuid)?;
let mut skipped_self = false;
// XXX: this shouldn't be a problem if replicasets aren't too big,
// but if they are we might want to construct a HashSet from candidates
for instance_name in replicas {
if my_instance_name == instance_name {
// Prefer someone else instead of self
skipped_self = true;
continue;
}
if all_instances_with_service.contains(&instance_name) {
return Ok(instance_name);
}
}
// In case there's no other suitable candidates, fallback to calling self
if skipped_self && all_instances_with_service.contains(&my_instance_name) {
return Ok(my_instance_name);
}
check_replicaset_is_not_expelled(node, replicaset_uuid, replicaset_tuple)?;
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::ServiceNotAvailable, format!("no {replicaset_uuid} replicas are available for service {plugin}.{service}")).into());
}
//
// Request to any instance with given service, multiple candidates
//
let mut candidates = all_instances_with_service;
// TODO: find a better strategy then just the random one
let random_index = rand::random::<usize>() % candidates.len();
let mut instance_name = std::mem::take(&mut candidates[random_index]);
if instance_name == my_instance_name && candidates.len() > 1 {
// Prefer someone else instead of self
let index = (random_index + 1) % candidates.len();
instance_name = std::mem::take(&mut candidates[index]);
}
return Ok(instance_name);
}
fn filter_instances_by_state(
node: &Node,
instance_names: &mut Vec<InstanceName>,
) -> Result<(), Error> {
let mut index = 0;
while index < instance_names.len() {
let name = &instance_names[index];
let instance = node.storage.instances.get(name)?;
if has_states!(instance, Expelled -> *) || !instance.may_respond() {
instance_names.swap_remove(index);
} else {
index += 1;
}
}
Ok(())
}
fn check_route_to_instance(
node: &Node,
plugin: &PluginIdentifier,
service: &str,
instance_name: &InstanceName,
) -> Result<(), Error> {
let instance = node.storage.instances.get(instance_name)?;
if has_states!(instance, Expelled -> *) {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::InstanceExpelled, format!("instance named '{instance_name}' was expelled")).into());
}
if !instance.may_respond() {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::InstanceUnavaliable, format!("instance with instance_name \"{instance_name}\" can't respond due it's state"),).into());
}
let res = node.storage.service_route_table.get_raw(&ServiceRouteKey {
instance_name,
plugin_name: &plugin.name,
plugin_version: &plugin.version,
service_name: service,
})?;
#[rustfmt::skip]
let Some(tuple) = res else {
return Err(BoxError::new(ErrorCode::ServiceNotStarted, format!("service '{plugin}.{service}' is not running on {instance_name}")).into());
};
let res = tuple
.field(ServiceRouteItem::FIELD_POISON)
.map_err(IntoBoxError::into_box_error)?;
let Some(is_poisoned) = res else {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::StorageCorrupted, "invalid contents has _pico_service_route").into());
};
if is_poisoned {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::ServicePoisoned, format!("service '{plugin}.{service}' is poisoned on {instance_name}")).into());
}
Ok(())
}
fn check_replicaset_is_not_expelled(
node: &Node,
uuid: &str,
maybe_tuple: Option<Tuple>,
) -> Result<(), Error> {
let tuple;
if let Some(t) = maybe_tuple {
tuple = t;
} else {
tuple = node.storage.replicasets.by_uuid_raw(uuid)?;
}
let state = tuple.field(Replicaset::FIELD_STATE)?;
let state: ReplicasetState = state.expect("replicaset should always have a state column");
if state == ReplicasetState::Expelled {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::ReplicasetExpelled, format!("replicaset with id {uuid} was expelled")).into());
}
Ok(())
}