From 4908c7c28b62651e25a70f25176ca720efb6f361 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Mon, 23 Sep 2024 18:15:08 +0300 Subject: [PATCH] fix: reduce size of FfiRpcHandler struct --- picoplugin/src/transport/rpc/server.rs | 239 +++++++++++++++---------- src/plugin/rpc/server.rs | 66 +++---- 2 files changed, 164 insertions(+), 141 deletions(-) diff --git a/picoplugin/src/transport/rpc/server.rs b/picoplugin/src/transport/rpc/server.rs index 39b6c0d5c5..40be206967 100644 --- a/picoplugin/src/transport/rpc/server.rs +++ b/picoplugin/src/transport/rpc/server.rs @@ -87,13 +87,13 @@ impl<'a> RouteBuilder<'a> { return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "path must be specified for RPC endpoint")); }; - let identifier = FfiRpcRouteIdentifier { - path: path.into(), - plugin: self.plugin.into(), - service: self.service.into(), - version: self.version.into(), - }; - let handler = FfiRpcHandler::new(&identifier, f); + let identifier = PackedServiceIdentifier::pack( + path.into(), + self.plugin.into(), + self.service.into(), + self.version.into(), + )?; + let handler = FfiRpcHandler::new(identifier, f); if let Err(e) = register_rpc_handler(handler) { // Note: recreating the error to capture the caller's source location #[rustfmt::skip] @@ -115,17 +115,6 @@ impl<'a> From<&'a PicoContext> for RouteBuilder<'a> { // ffi wrappers //////////////////////////////////////////////////////////////////////////////// -/// **For internal use**. -/// -/// Use [`RouteBuilder`] instead. -#[derive(Debug, Default, Clone, Copy)] -pub struct FfiRpcRouteIdentifier { - pub path: FfiSafeStr, - pub plugin: FfiSafeStr, - pub service: FfiSafeStr, - pub version: FfiSafeStr, -} - /// **For internal use**. #[inline] fn register_rpc_handler(handler: FfiRpcHandler) -> Result<(), BoxError> { @@ -163,21 +152,8 @@ pub struct FfiRpcHandler { /// But when calling the closure, the `const` pointer should be used. closure_pointer: *mut (), - /// Points into [`Self::string_storage`]. - path: FfiSafeStr, - /// Points into [`Self::string_storage`]. - plugin_name: FfiSafeStr, - /// Points into [`Self::string_storage`]. - service_name: FfiSafeStr, - /// Points into [`Self::string_storage`]. - plugin_version: FfiSafeStr, - /// Points into [`Self::string_storage`]. - route_repr: FfiSafeStr, - /// This data is owned by this struct (freed on drop). - /// This slice stores all of the strings above, so that when it's needed to - /// be dropped we only need to free one slice. - string_storage: FfiSafeBytes, + pub identifier: PackedServiceIdentifier, } impl Drop for FfiRpcHandler { @@ -188,67 +164,19 @@ impl Drop for FfiRpcHandler { } impl FfiRpcHandler { - fn new<F>(identifier: &FfiRpcRouteIdentifier, f: F) -> Self + fn new<F>(identifier: PackedServiceIdentifier, f: F) -> Self where F: Fn(Request<'_>, &mut Context) -> Result<Response, BoxError> + 'static, { let closure = Box::new(f); let closure_pointer: *mut F = Box::into_raw(closure); - // - // Store the strings in a contiguous slice of memory and set the pointers appropriately - // - let total_string_len = identifier.plugin.len() - // For an extra '.' between plugin and service names - + 1 - + identifier.service.len() - + identifier.path.len() - + identifier.version.len(); - let mut string_storage = Vec::with_capacity(total_string_len); - let start = string_storage.as_mut_ptr(); - - let mut p = start; - let mut push_and_get_slice = |s: FfiSafeStr| unsafe { - string_storage.extend_from_slice(s.as_bytes()); - let res = FfiSafeStr::from_raw_parts(NonNull::new_unchecked(p), s.len()); - p = p.add(s.len()); - res - }; - let plugin_name = push_and_get_slice(identifier.plugin); - push_and_get_slice(".".into()); - let service_name = push_and_get_slice(identifier.service); - let path = push_and_get_slice(identifier.path); - let plugin_version = push_and_get_slice(identifier.version); - let route_repr = unsafe { - FfiSafeStr::from_raw_parts( - NonNull::new_unchecked(start), - total_string_len - plugin_version.len(), - ) - }; - - debug_assert_eq!( - start, - string_storage.as_mut_ptr(), - "vec must not have been reallocated, because we store pointers into it" - ); - let capacity = string_storage.capacity(); - - // Self now ownes this data and will be freeing it in it's `drop`. - std::mem::forget(string_storage); - - let string_storage = unsafe { std::slice::from_raw_parts(start, capacity) }; - FfiRpcHandler { callback: Self::trampoline::<F>, drop: Self::drop_handler::<F>, closure_pointer: closure_pointer.cast(), - path, - plugin_name, - service_name, - plugin_version, - route_repr, - string_storage: string_storage.into(), + identifier, } } @@ -299,13 +227,7 @@ impl FfiRpcHandler { (*handler).closure_pointer = 0xcccccccccccccccc_u64 as _; } - let (pointer, capacity) = (*handler).string_storage.into_raw_parts(); - // Note: we pretend the original Vec was filled to capacity which - // may or may not be true, there might have been some unitialized - // data at the end. But this doesn't matter in this case because we - // just want to drop the data, and only the capacity matters. - let string_storage = Vec::from_raw_parts(pointer, capacity, capacity); - drop(string_storage); + (*handler).identifier.drop(); } } @@ -330,34 +252,153 @@ impl FfiRpcHandler { Ok(result) } +} + +/// **For internal use**. +/// +/// Use [`RouteBuilder`] instead. +/// +/// This struct stores an RPC route identifier in the following packed format: +/// `{plugin}.{service}{path}{version}`. This format allows for efficient +/// extraction of the RPC route identifier for purposes of logging (note that +/// version is not displayed), and also losslessly stores info about all the +/// parts of the idenifier. +/// +/// This represnetation also adds a constraint on the maximum length of the +/// plugin name, service name and path, each one of them must not be longer than +/// 65535 bytes (which is obviously engough for anybody). +#[repr(C)] +#[derive(Debug, Default, Clone, Copy)] +pub struct PackedServiceIdentifier { + pub storage: FfiSafeStr, + pub plugin_len: u16, + pub service_len: u16, + pub path_len: u16, + pub version_len: u16, +} + +impl PackedServiceIdentifier { + fn pack(path: &str, plugin: &str, service: &str, version: &str) -> Result<Self, BoxError> { + let Ok(plugin_len) = plugin.len().try_into() else { + #[rustfmt::skip] + return Err(BoxError::new(TarantoolErrorCode::IllegalParams, format!("plugin name length must not exceed 65535, got {}", plugin.len()))); + }; + let Ok(service_len) = service.len().try_into() else { + #[rustfmt::skip] + return Err(BoxError::new(TarantoolErrorCode::IllegalParams, format!("service name length must not exceed 65535, got {}", service.len()))); + }; + let Ok(path_len) = path.len().try_into() else { + #[rustfmt::skip] + return Err(BoxError::new(TarantoolErrorCode::IllegalParams, format!("route path length must not exceed 65535, got {}", path.len()))); + }; + let Ok(version_len) = version.len().try_into() else { + #[rustfmt::skip] + return Err(BoxError::new(TarantoolErrorCode::IllegalParams, format!("version string length must not exceed 65535, got {}", version.len()))); + }; + + let total_string_len = plugin_len + // For an extra '.' between plugin and service names + + 1 + + service_len + + path_len + + version_len; + let mut string_storage = Vec::with_capacity(total_string_len as _); + string_storage.extend_from_slice(plugin.as_bytes()); + string_storage.push(b'.'); + string_storage.extend_from_slice(service.as_bytes()); + string_storage.extend_from_slice(path.as_bytes()); + string_storage.extend_from_slice(version.as_bytes()); + + let start = string_storage.as_mut_ptr(); + let capacity = string_storage.capacity(); + + // Safety: vec has an allocated buffer, so the pointer cannot be null. + // Also a concatenation of utf8 strings is always a utf8 string. + let storage = + unsafe { FfiSafeStr::from_raw_parts(NonNull::new_unchecked(start), capacity) }; + + // Self now owns this data and will be freeing it in it's `drop`. + std::mem::forget(string_storage); + + Ok(Self { + storage, + plugin_len, + service_len, + path_len, + version_len, + }) + } + + #[allow(unreachable_code)] + pub(crate) fn drop(&mut self) { + let (pointer, capacity) = self.storage.into_raw_parts(); + if capacity == 0 { + #[cfg(debug_assertions)] + unreachable!("drop should only be called once"); + return; + } + + // Note: we pretend the original Vec was filled to capacity which + // may or may not be true, there might have been some unitialized + // data at the end. But this doesn't matter in this case because we + // just want to drop the data, and only the capacity matters. + // Safety: safe because drop only happens once and the next time the + // pointer will be set to null. + unsafe { + let string_storage = Vec::from_raw_parts(pointer, capacity, capacity); + drop(string_storage); + } + // Overwrite with len = 0, to guard against double free + self.storage = FfiSafeStr::from(""); + } #[inline(always)] - pub fn path(&self) -> &str { + fn storage_slice(&self, start: u16, len: u16) -> &str { // SAFETY: data is alive for the lifetime of `&self`, and borrow checker does it's thing - unsafe { self.path.as_str() } + let storage = unsafe { self.storage.as_str() }; + let end = (start + len) as usize; + &storage[start as usize..end] } #[inline(always)] pub fn plugin(&self) -> &str { - // SAFETY: data is alive for the lifetime of `&self`, and borrow checker does it's thing - unsafe { self.plugin_name.as_str() } + self.storage_slice(0, self.plugin_len) } #[inline(always)] pub fn service(&self) -> &str { - // SAFETY: data is alive for the lifetime of `&self`, and borrow checker does it's thing - unsafe { self.service_name.as_str() } + self.storage_slice(self.plugin_len + 1, self.service_len) + } + + #[inline(always)] + pub fn path(&self) -> &str { + self.storage_slice(self.plugin_len + 1 + self.service_len, self.path_len) } #[inline(always)] pub fn route_repr(&self) -> &str { - // SAFETY: data is alive for the lifetime of `&self`, and borrow checker does it's thing - unsafe { self.route_repr.as_str() } + self.storage_slice(0, self.plugin_len + 1 + self.service_len + self.path_len) } #[inline(always)] pub fn version(&self) -> &str { - // SAFETY: data is alive for the lifetime of `&self`, and borrow checker does it's thing - unsafe { self.plugin_version.as_str() } + self.storage_slice( + self.plugin_len + 1 + self.service_len + self.path_len, + self.version_len, + ) + } +} + +impl std::fmt::Display for PackedServiceIdentifier { + #[inline(always)] + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "{}.{}:v{}{}", + self.plugin(), + self.service(), + self.version(), + self.path() + ) } } diff --git a/src/plugin/rpc/server.rs b/src/plugin/rpc/server.rs index 2deaaf2107..3d15fc2fef 100644 --- a/src/plugin/rpc/server.rs +++ b/src/plugin/rpc/server.rs @@ -68,17 +68,17 @@ pub fn proc_rpc_dispatch_impl(args: &RawBytes) -> Result<&'static RawBytes, TntE // SAFETY: safe because it doesn't outlive `args` let v_requestor = unsafe { context.plugin_version.as_str() }; - if handler.version() != v_requestor { + let v_handler = handler.identifier.version(); + if v_handler != v_requestor { return Err(BoxError::new(ErrorCode::WrongPluginVersion, format!("RPC request to an endpoint `{plugin}.{service}{path}` with incompatible version (requestor: {v_requestor}, handler: {v_handler})", plugin=key.plugin, service=key.service, - v_handler=handler.version(), )).into()); } // TODO: check service is not poisoned - fiber::set_name(handler.route_repr()); + fiber::set_name(handler.identifier.route_repr()); let output = handler .call(input, &context) .map_err(|()| BoxError::last())?; @@ -126,33 +126,34 @@ unsafe fn handlers_mut() -> &'static mut RpcHandlerMap { } pub fn register_rpc_handler(handler: FfiRpcHandler) -> Result<(), BoxError> { - if handler.path().is_empty() { + let identifier = &handler.identifier; + if identifier.path().is_empty() { #[rustfmt::skip] return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "RPC route path cannot be empty")); - } else if !handler.path().starts_with('/') { + } else if !identifier.path().starts_with('/') { #[rustfmt::skip] - return Err(BoxError::new(TarantoolErrorCode::IllegalParams, format!("RPC route path must start with '/', got '{}'", handler.path()))); + return Err(BoxError::new(TarantoolErrorCode::IllegalParams, format!("RPC route path must start with '/', got '{}'", identifier.path()))); } - if handler.plugin().is_empty() { + if identifier.plugin().is_empty() { #[rustfmt::skip] return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "RPC route plugin name cannot be empty")); } - if handler.service().is_empty() { + if identifier.service().is_empty() { #[rustfmt::skip] return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "RPC route service name cannot be empty")); } - if handler.version().is_empty() { + if identifier.version().is_empty() { #[rustfmt::skip] return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "RPC route service version cannot be empty")); } let key = RpcHandlerKey { - plugin: handler.plugin(), - service: handler.service(), - path: handler.path(), + plugin: identifier.plugin(), + service: identifier.service(), + path: identifier.path(), }; // SAFETY: this is safe as long as we never let users touch `RpcHandlerKey`, @@ -168,35 +169,23 @@ pub fn register_rpc_handler(handler: FfiRpcHandler) -> Result<(), BoxError> { let key = e.key(); let old_handler = e.get(); + let v_old = old_handler.identifier.version(); + let v_new = handler.identifier.version(); #[rustfmt::skip] - if old_handler.version() != handler.version() { - let message = format!("RPC endpoint `{plugin}.{service}{path}` is already registered with a different version (old: {old_version}, new: {new_version})", plugin=key.plugin, service=key.service, path=key.path, old_version=old_handler.version(), new_version=handler.version()); + if v_old != v_new { + let message = format!("RPC endpoint `{plugin}.{service}{path}` is already registered with a different version (old: {v_old}, new: {v_new})", plugin=key.plugin, service=key.service, path=key.path); return Err(BoxError::new(TarantoolErrorCode::FunctionExists, message)); } else if old_handler.identity() != handler.identity() { - let message = format!("RPC endpoint `{plugin}.{service}:v{version}{path}` is already registered with a different handler", plugin = key.plugin, service = key.service, version = old_handler.version(), path = key.path); + let message = format!("RPC endpoint `{}` is already registered with a different handler", old_handler.identifier); return Err(BoxError::new(TarantoolErrorCode::FunctionExists, message)); } else { - tlog!( - Info, - "RPC endpoint `{plugin}.{service}:v{version}{path}` is already registered", - plugin = handler.plugin(), - service = handler.service(), - version = handler.version(), - path = handler.path(), - ); + tlog!(Info, "RPC endpoint `{}` is already registered", handler.identifier); return Ok(()); }; } }; - tlog!( - Info, - "registered RPC endpoint `{}.{}:v{}{}`", - handler.plugin(), - handler.service(), - handler.version(), - handler.path(), - ); + tlog!(Info, "registered RPC endpoint `{}`", handler.identifier); entry.insert(Rc::new(handler)); Ok(()) } @@ -205,18 +194,11 @@ pub fn unregister_all_rpc_handlers(plugin_name: &str, service_name: &str, plugin // SAFETY: safe because we don't leak any references to the stored data let handlers = unsafe { handlers_mut() }; handlers.retain(|_, handler| { - let matches = handler.plugin() == plugin_name - && handler.service() == service_name - && handler.version() == plugin_version; + let matches = handler.identifier.plugin() == plugin_name + && handler.identifier.service() == service_name + && handler.identifier.version() == plugin_version; if matches { - tlog!( - Info, - "unregistered RPC endpoint `{}.{}-v{}{}`", - handler.plugin(), - handler.service(), - handler.version(), - handler.path(), - ); + tlog!(Info, "unregistered RPC endpoint `{}`", handler.identifier); // Don't retain false } else { -- GitLab