Skip to content
Snippets Groups Projects
Commit 2a5e33ad authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

fix: make target the mandatory parameter for rpc::RequestBuilder

parent 1367a5a4
No related branches found
No related tags found
1 merge request!973plugin rpc sdk
......@@ -2,4 +2,5 @@ pub mod client;
pub mod server;
pub use client::RequestBuilder;
pub use client::RequestTarget;
pub use server::RouteBuilder;
......@@ -19,59 +19,43 @@ use tarantool::util::DisplayAsHexBytes;
// RequestBuilder
////////////////////////////////////////////////////////////////////////////////
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct RequestBuilder<'a> {
target: FfiSafeRpcTargetSpecifier,
plugin_service: Option<(&'a str, &'a str)>,
version: Option<&'a str>,
path: Option<&'a str>,
target: Option<FfiSafeRpcTargetSpecifier>,
input: Option<Cow<'a, [u8]>>,
timeout: Option<Duration>,
}
impl<'a> RequestBuilder<'a> {
#[inline]
pub fn new() -> Self {
Self::default()
}
#[inline]
#[track_caller]
pub fn instance_id(mut self, instance_id: &'a str) -> Self {
let new = FfiSafeRpcTargetSpecifier::InstanceId(instance_id.into());
if let Some(old) = self.target.take() {
#[rustfmt::skip]
tarantool::say_warn!("RequestBuilder target is silently changed from {old:?} to {new:?}");
}
self.target = Some(new);
self
}
#[inline]
#[track_caller]
pub fn replicaset_id(mut self, replicaset_id: &'a str, to_master: bool) -> Self {
let new = FfiSafeRpcTargetSpecifier::Replicaset {
replicaset_id: replicaset_id.into(),
to_master,
pub fn new(target: RequestTarget<'a>) -> Self {
let target = match target {
RequestTarget::Any => FfiSafeRpcTargetSpecifier::Any,
RequestTarget::InstanceId(instance_id) => {
FfiSafeRpcTargetSpecifier::InstanceId(instance_id.into())
}
RequestTarget::BucketId(bucket_id, to_master) => FfiSafeRpcTargetSpecifier::BucketId {
bucket_id,
to_master,
},
RequestTarget::ReplicasetId(replicaset_id, to_master) => {
FfiSafeRpcTargetSpecifier::Replicaset {
replicaset_id: replicaset_id.into(),
to_master,
}
}
};
if let Some(old) = self.target.take() {
#[rustfmt::skip]
tarantool::say_warn!("RequestBuilder target is silently changed from {old:?} to {new:?}");
}
self.target = Some(new);
self
}
#[inline]
#[track_caller]
#[rustfmt::skip]
pub fn bucket_id(mut self, bucket_id: u64, to_master: bool) -> Self {
let new = FfiSafeRpcTargetSpecifier::BucketId { bucket_id, to_master };
if let Some(old) = self.target.take() {
tarantool::say_warn!("RequestBuilder target is silently changed from {old:?} to {new:?}");
Self {
target,
plugin_service: None,
version: None,
path: None,
input: None,
timeout: None,
}
self.target = Some(new);
self
}
#[inline]
......@@ -177,7 +161,7 @@ impl<'a> RequestBuilder<'a> {
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "input must be specified for RPC request"));
};
let target = self.target.unwrap_or(FfiSafeRpcTargetSpecifier::Any);
let target = self.target;
Ok(FfiSafeRpcRequestArguments {
plugin: plugin.into(),
......@@ -198,6 +182,32 @@ impl<'a> RequestBuilder<'a> {
}
}
/// An enumeration of possible target specifiers for RPC requests.
/// Determines which instance in the picodata cluster the request should be sent to.
#[derive(Default, Debug, Clone, Copy)]
#[non_exhaustive]
pub enum RequestTarget<'a> {
/// Any instance running the corresponding service.
#[default]
Any,
/// The specific instance with a given instance id.
InstanceId(&'a str),
/// An instance in the replicaset which currently stores the bucket with
/// the specified id.
///
/// If the boolean parameter is `true`, then send the request to the replicaset master,
/// otherwise any replica.
BucketId(u64, bool),
/// An instance in the replicaset determined by the explicit replicaset id.
///
/// If the boolean parameter is `true`, then send the request to the replicaset master,
/// otherwise any replica.
ReplicasetId(&'a str, bool),
}
////////////////////////////////////////////////////////////////////////////////
// ffi wrappers
////////////////////////////////////////////////////////////////////////////////
......
......@@ -511,7 +511,20 @@ impl Service for ServiceWithRpcTests {
// TODO: impl From<*> for BoxError
.map_err(|e| BoxError::new(TarantoolErrorCode::IllegalParams, e.to_string()))?;
let mut builder = rpc::RequestBuilder::new();
let mut target = rpc::RequestTarget::Any;
if let Some(instance_id) = &request.instance_id {
target = rpc::RequestTarget::InstanceId(instance_id);
} else if let Some(replicaset_id) = &request.replicaset_id {
target = rpc::RequestTarget::ReplicasetId(
replicaset_id,
request.to_master.unwrap_or(false),
);
} else if let Some(bucket_id) = request.bucket_id {
target =
rpc::RequestTarget::BucketId(bucket_id, request.to_master.unwrap_or(false));
}
let mut builder = rpc::RequestBuilder::new(target);
if let Some((plugin, service, version)) = &request.service_info {
builder = builder
.plugin_service(plugin, service)
......@@ -527,15 +540,6 @@ impl Service for ServiceWithRpcTests {
timeout = Duration::from_secs_f64(t.float().unwrap());
}
if let Some(instance_id) = &request.instance_id {
builder = builder.instance_id(instance_id);
} else if let Some(replicaset_id) = &request.replicaset_id {
builder =
builder.replicaset_id(replicaset_id, request.to_master.unwrap_or(false));
} else if let Some(bucket_id) = request.bucket_id {
builder = builder.bucket_id(bucket_id, request.to_master.unwrap_or(false));
}
let output = builder
.path(&request.path)
.raw_input(&request.input)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment