From 68294a4826ec1baa4c7d9b9a927c16f5b5873918 Mon Sep 17 00:00:00 2001
From: godzie44 <godzie@yandex.ru>
Date: Thu, 12 Sep 2024 11:35:04 +0300
Subject: [PATCH] refactor(plugin): RPC now redirect to local functions when
 possible

Closes #708
---
 src/plugin/rpc/client.rs | 43 ++++++++++++++++++++++++++--------------
 src/plugin/rpc/server.rs |  8 ++++++--
 test/int/test_plugin.py  | 19 ++++++++++++++++++
 3 files changed, 53 insertions(+), 17 deletions(-)

diff --git a/src/plugin/rpc/client.rs b/src/plugin/rpc/client.rs
index 15d69a2376..30f79ecbe1 100644
--- a/src/plugin/rpc/client.rs
+++ b/src/plugin/rpc/client.rs
@@ -26,6 +26,22 @@ use tarantool::uuid::Uuid;
 // rpc out
 ////////////////////////////////////////////////////////////////////////////////
 
+fn process_rpc_output(mut output: &[u8]) -> Result<&'static [u8], Error> {
+    let output_len = rmp::decode::read_bin_len(&mut output).map_err(|e| {
+        BoxError::new(
+            TarantoolErrorCode::InvalidMsgpack,
+            format!("expected bin: {e}"),
+        )
+    })?;
+    if output.len() != output_len as usize {
+        #[rustfmt::skip]
+        return Err(BoxError::new(TarantoolErrorCode::InvalidMsgpack, format!("this is weird: {output_len} != {}", output.len())).into());
+    }
+
+    let res = copy_to_region(output)?;
+    return Ok(res);
+}
+
 /// Returns data allocated on the region allocator (or statically allocated).
 pub(crate) fn send_rpc_request(
     plugin_identity: &PluginIdentifier,
@@ -42,6 +58,11 @@ pub(crate) fn send_rpc_request(
 
     let instance_id = resolve_rpc_target(plugin_identity, service, target, node)?;
 
+    let my_instance_id = node
+        .raft_storage
+        .instance_id()?
+        .expect("should be persisted at this point");
+
     if path.starts_with('.') {
         return call_builtin_stored_proc(pool, path, input, &instance_id, timeout);
     }
@@ -61,6 +82,12 @@ pub(crate) fn send_rpc_request(
     // Safe because buffer contains a msgpack array
     let args = unsafe { TupleBuffer::from_vec_unchecked(buffer) };
 
+    if instance_id == my_instance_id {
+        let output = rpc::server::proc_rpc_dispatch_impl(args.as_ref().into())?;
+        return process_rpc_output(output);
+    };
+
+    crate::error_injection!("RPC_NETWORK_ERROR" => return Err(Error::other("injected error")));
     tlog!(Debug, "sending plugin RPC request";
         "instance_id" => %instance_id,
         "request_id" => %request_id,
@@ -74,21 +101,7 @@ pub(crate) fn send_rpc_request(
     )?;
     // FIXME: remove this extra allocation for RawByteBuf
     let output: RawByteBuf = fiber::block_on(future)?;
-
-    let mut output = &**output;
-    let output_len = rmp::decode::read_bin_len(&mut output).map_err(|e| {
-        BoxError::new(
-            TarantoolErrorCode::InvalidMsgpack,
-            format!("expected bin: {e}"),
-        )
-    })?;
-    if output.len() != output_len as usize {
-        #[rustfmt::skip]
-        return Err(BoxError::new(TarantoolErrorCode::InvalidMsgpack, format!("this is weird: {output_len} != {}", output.len())).into());
-    }
-
-    let res = copy_to_region(output)?;
-    Ok(res)
+    process_rpc_output(&output)
 }
 
 fn call_builtin_stored_proc(
diff --git a/src/plugin/rpc/server.rs b/src/plugin/rpc/server.rs
index 676841e69f..68110f5cee 100644
--- a/src/plugin/rpc/server.rs
+++ b/src/plugin/rpc/server.rs
@@ -17,8 +17,7 @@ use tarantool::unwrap_ok_or;
 // proc_rpc_dispatch
 ////////////////////////////////////////////////////////////////////////////////
 
-#[tarantool::proc(packed_args)]
-pub fn proc_rpc_dispatch(args: &RawBytes) -> Result<&'static RawBytes, TntError> {
+pub fn proc_rpc_dispatch_impl(args: &RawBytes) -> Result<&'static RawBytes, TntError> {
     let msgpack_args = msgpack_read_array(args)?;
     let [path, mut input, context] = msgpack_args[..] else {
         #[rustfmt::skip]
@@ -93,6 +92,11 @@ pub fn proc_rpc_dispatch(args: &RawBytes) -> Result<&'static RawBytes, TntError>
     Ok(RawBytes::new(slice))
 }
 
+#[tarantool::proc(packed_args)]
+pub fn proc_rpc_dispatch(args: &RawBytes) -> Result<&'static RawBytes, TntError> {
+    proc_rpc_dispatch_impl(args)
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // handler storage
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/test/int/test_plugin.py b/test/int/test_plugin.py
index f200f8e069..566258bb14 100644
--- a/test/int/test_plugin.py
+++ b/test/int/test_plugin.py
@@ -2269,6 +2269,25 @@ cluster:
     output = i1.call(".proc_rpc_dispatch", "/proxy", msgpack.dumps(input), context)
     assert msgpack.loads(output) == ["pong", i2.instance_id, b"how are you?"]
 
+    i1.call("pico._inject_error", "RPC_NETWORK_ERROR", True)
+    context = make_context()
+
+    # check that rpc call to a non-self instance will fail
+    input = dict(
+        path="/ping",
+        instance_id=i2.instance_id,
+        input="how are you?",
+    )
+    with pytest.raises(TarantoolError, match="injected error"):
+        i1.call(".proc_rpc_dispatch", "/proxy", msgpack.dumps(input), context)
+
+    # check self-calling RPC (this should not use a network)
+    input["instance_id"] = i1.instance_id
+    output = i1.call(".proc_rpc_dispatch", "/proxy", msgpack.dumps(input), context)
+    assert msgpack.loads(output) == ["pong", i1.instance_id, b"how are you?"]
+
+    i1.call("pico._inject_error", "RPC_NETWORK_ERROR", False)
+
     # Check calling RPC to ANY instance via the plugin SDK
     context = make_context()
     input = dict(
-- 
GitLab