diff --git a/picoplugin/src/transport/rpc.rs b/picoplugin/src/transport/rpc.rs index 878efddc4e4b86ddd15a0fdae437cb02bff67c47..783e98441b22fd2aba4732ce4ebc1edf97699b62 100644 --- a/picoplugin/src/transport/rpc.rs +++ b/picoplugin/src/transport/rpc.rs @@ -141,6 +141,13 @@ enum ResponseImpl { Owned(Box<[u8]>), } +impl Default for Response { + #[inline(always)] + fn default() -> Self { + Self::empty() + } +} + impl Response { /// This method is for **internal use only**. /// @@ -219,6 +226,17 @@ impl Response { }) } + /// Constructs an empty response to the RPC request. + /// + /// Use this or even [`Default::default`] when your RPC handler doesn't + /// need to return anything. + /// + /// This method should be used on the **server side** of the RPC request. + #[inline(always)] + pub fn empty() -> Self { + Self::from_static(b"") + } + /// Constructs a response to the RPC request from provided raw bytes. /// The bytes are sent as is. /// diff --git a/src/plugin/manager.rs b/src/plugin/manager.rs index 8b7618cfabceb4232987ece78a9201e205fcc283..5558d370e71c6c2f4b15a1243ddc3288ec49d28b 100644 --- a/src/plugin/manager.rs +++ b/src/plugin/manager.rs @@ -875,7 +875,7 @@ fn stop_service(service: &mut Service, context: &PicoContext) { ); } - rpc::server::unregister_all_rpc_handlers(service); + rpc::server::unregister_all_rpc_handlers(&service.plugin_name, &service.name, &service.version); } /// Plugin manager inner loop, using for handle async events (must be run in a separate fiber). diff --git a/src/plugin/rpc/client.rs b/src/plugin/rpc/client.rs index c98e718fa19919b379db64f826546b3613c95b52..5e6a315cd2bb787df6b5aa5fc7e8f2e4ef7b622d 100644 --- a/src/plugin/rpc/client.rs +++ b/src/plugin/rpc/client.rs @@ -126,7 +126,7 @@ fn call_builtin_stored_proc( copy_to_region(&output).map_err(Into::into) } -fn encode_request_arguments( +pub(crate) fn encode_request_arguments( buffer: &mut Vec<u8>, path: &str, input: &[u8], diff --git a/src/plugin/rpc/server.rs b/src/plugin/rpc/server.rs index f081020d57486ccc48e5f5b95c8f56729463405e..2deaaf2107442650c03e6c61166ea9e2f0eda0ef 100644 --- a/src/plugin/rpc/server.rs +++ b/src/plugin/rpc/server.rs @@ -1,5 +1,4 @@ use crate::error_code::ErrorCode; -use crate::plugin::Service; use crate::tlog; use picoplugin::transport::context::FfiSafeContext; use picoplugin::transport::rpc::server::FfiRpcHandler; @@ -202,13 +201,13 @@ pub fn register_rpc_handler(handler: FfiRpcHandler) -> Result<(), BoxError> { Ok(()) } -pub fn unregister_all_rpc_handlers(service: &Service) { +pub fn unregister_all_rpc_handlers(plugin_name: &str, service_name: &str, plugin_version: &str) { // SAFETY: safe because we don't leak any references to the stored data let handlers = unsafe { handlers_mut() }; handlers.retain(|_, handler| { - let matches = handler.plugin() == service.plugin_name - && handler.service() == service.name - && handler.version() == service.version; + let matches = handler.plugin() == plugin_name + && handler.service() == service_name + && handler.version() == plugin_version; if matches { tlog!( Info, @@ -246,3 +245,113 @@ fn msgpack_read_array(data: &[u8]) -> Result<Vec<&[u8]>, TntError> { Ok(result) } + +//////////////////////////////////////////////////////////////////////////////// +// tests +//////////////////////////////////////////////////////////////////////////////// + +// #[cfg(feature = "internal_test")] +mod test { + use super::*; + use picoplugin::transport::rpc; + use std::cell::Cell; + use tarantool::fiber; + + #[derive(Clone, Default)] + struct DropCheck(Rc<Cell<bool>>); + impl Drop for DropCheck { + fn drop(&mut self) { + self.0.set(true) + } + } + + fn call_rpc_local( + plugin: &str, + service: &str, + version: &str, + path: &str, + input: &[u8], + ) -> Result<&'static RawBytes, TntError> { + let mut buffer = vec![]; + let request_id = tarantool::uuid::Uuid::random(); + crate::plugin::rpc::client::encode_request_arguments( + &mut buffer, + path, + input, + &request_id, + plugin, + service, + version, + ) + .unwrap(); + proc_rpc_dispatch_impl(buffer.as_slice().into()) + } + + #[tarantool::test] + fn rpc_handler_no_use_after_free() { + init_handlers(); + + let plugin_name = "plugin"; + let service_name = "service"; + let plugin_version = "3.14.78-rc37.2"; + + let cond_tx = Rc::new(fiber::Cond::new()); + let cond_rx = cond_tx.clone(); + let drop_check_rx = DropCheck::default(); + let drop_check_tx = drop_check_rx.clone(); + let n_simultaneous_fibers_rx = Rc::new(Cell::new(0)); + let n_simultaneous_fibers_tx = n_simultaneous_fibers_rx.clone(); + + let builder = unsafe { + rpc::RouteBuilder::from_service_info(plugin_name, service_name, plugin_version) + }; + builder + .path("/test-path") + .register(move |request, context| { + _ = request; + _ = context; + + _ = &drop_check_tx; + + let was = n_simultaneous_fibers_tx.get(); + n_simultaneous_fibers_tx.set(was + 1); + + cond_rx.wait(); + + Ok(Default::default()) + }) + .unwrap(); + + // Control checks: + // - the closure isn't dropped yet (obviously) + assert_eq!(drop_check_rx.0.get(), false); + // - no fibers have entered the closure yet + assert_eq!(n_simultaneous_fibers_rx.get(), 0); + + let jh1 = fiber::start(|| { + call_rpc_local(plugin_name, service_name, plugin_version, "/test-path", b"").unwrap() + }); + let jh2 = fiber::start(|| { + call_rpc_local(plugin_name, service_name, plugin_version, "/test-path", b"").unwrap() + }); + + // - no reason for the closure to be dropped yet + assert_eq!(drop_check_rx.0.get(), false); + // - both fibers have entered the closure + assert_eq!(n_simultaneous_fibers_rx.get(), 2); + + // Unregister the handler. Now the closure should be dropped ASAP + unregister_all_rpc_handlers(plugin_name, service_name, plugin_version); + + // - The closure has still not been dropped, because the fiber's a keeping it alive (holding strong references) + assert_eq!(drop_check_rx.0.get(), false); + + // Wake up and join the fibers. + cond_tx.broadcast(); + jh1.join(); + jh2.join(); + + // - Finally all strong references to the closure have been dropped, and so was the closure + assert_eq!(drop_check_rx.0.get(), true); + } +}