Skip to content
Snippets Groups Projects
Commit 4908c7c2 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon: Committed by Dmitry Rodionov
Browse files

fix: reduce size of FfiRpcHandler struct

parent c20eee2c
No related branches found
No related tags found
1 merge request!1350fix: plugin metrics sdk used to do some undefined behavior
......@@ -87,13 +87,13 @@ impl<'a> RouteBuilder<'a> {
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "path must be specified for RPC endpoint"));
};
let identifier = FfiRpcRouteIdentifier {
path: path.into(),
plugin: self.plugin.into(),
service: self.service.into(),
version: self.version.into(),
};
let handler = FfiRpcHandler::new(&identifier, f);
let identifier = PackedServiceIdentifier::pack(
path.into(),
self.plugin.into(),
self.service.into(),
self.version.into(),
)?;
let handler = FfiRpcHandler::new(identifier, f);
if let Err(e) = register_rpc_handler(handler) {
// Note: recreating the error to capture the caller's source location
#[rustfmt::skip]
......@@ -115,17 +115,6 @@ impl<'a> From<&'a PicoContext> for RouteBuilder<'a> {
// ffi wrappers
////////////////////////////////////////////////////////////////////////////////
/// **For internal use**.
///
/// Use [`RouteBuilder`] instead.
#[derive(Debug, Default, Clone, Copy)]
pub struct FfiRpcRouteIdentifier {
pub path: FfiSafeStr,
pub plugin: FfiSafeStr,
pub service: FfiSafeStr,
pub version: FfiSafeStr,
}
/// **For internal use**.
#[inline]
fn register_rpc_handler(handler: FfiRpcHandler) -> Result<(), BoxError> {
......@@ -163,21 +152,8 @@ pub struct FfiRpcHandler {
/// But when calling the closure, the `const` pointer should be used.
closure_pointer: *mut (),
/// Points into [`Self::string_storage`].
path: FfiSafeStr,
/// Points into [`Self::string_storage`].
plugin_name: FfiSafeStr,
/// Points into [`Self::string_storage`].
service_name: FfiSafeStr,
/// Points into [`Self::string_storage`].
plugin_version: FfiSafeStr,
/// Points into [`Self::string_storage`].
route_repr: FfiSafeStr,
/// This data is owned by this struct (freed on drop).
/// This slice stores all of the strings above, so that when it's needed to
/// be dropped we only need to free one slice.
string_storage: FfiSafeBytes,
pub identifier: PackedServiceIdentifier,
}
impl Drop for FfiRpcHandler {
......@@ -188,67 +164,19 @@ impl Drop for FfiRpcHandler {
}
impl FfiRpcHandler {
fn new<F>(identifier: &FfiRpcRouteIdentifier, f: F) -> Self
fn new<F>(identifier: PackedServiceIdentifier, f: F) -> Self
where
F: Fn(Request<'_>, &mut Context) -> Result<Response, BoxError> + 'static,
{
let closure = Box::new(f);
let closure_pointer: *mut F = Box::into_raw(closure);
//
// Store the strings in a contiguous slice of memory and set the pointers appropriately
//
let total_string_len = identifier.plugin.len()
// For an extra '.' between plugin and service names
+ 1
+ identifier.service.len()
+ identifier.path.len()
+ identifier.version.len();
let mut string_storage = Vec::with_capacity(total_string_len);
let start = string_storage.as_mut_ptr();
let mut p = start;
let mut push_and_get_slice = |s: FfiSafeStr| unsafe {
string_storage.extend_from_slice(s.as_bytes());
let res = FfiSafeStr::from_raw_parts(NonNull::new_unchecked(p), s.len());
p = p.add(s.len());
res
};
let plugin_name = push_and_get_slice(identifier.plugin);
push_and_get_slice(".".into());
let service_name = push_and_get_slice(identifier.service);
let path = push_and_get_slice(identifier.path);
let plugin_version = push_and_get_slice(identifier.version);
let route_repr = unsafe {
FfiSafeStr::from_raw_parts(
NonNull::new_unchecked(start),
total_string_len - plugin_version.len(),
)
};
debug_assert_eq!(
start,
string_storage.as_mut_ptr(),
"vec must not have been reallocated, because we store pointers into it"
);
let capacity = string_storage.capacity();
// Self now ownes this data and will be freeing it in it's `drop`.
std::mem::forget(string_storage);
let string_storage = unsafe { std::slice::from_raw_parts(start, capacity) };
FfiRpcHandler {
callback: Self::trampoline::<F>,
drop: Self::drop_handler::<F>,
closure_pointer: closure_pointer.cast(),
path,
plugin_name,
service_name,
plugin_version,
route_repr,
string_storage: string_storage.into(),
identifier,
}
}
......@@ -299,13 +227,7 @@ impl FfiRpcHandler {
(*handler).closure_pointer = 0xcccccccccccccccc_u64 as _;
}
let (pointer, capacity) = (*handler).string_storage.into_raw_parts();
// Note: we pretend the original Vec was filled to capacity which
// may or may not be true, there might have been some unitialized
// data at the end. But this doesn't matter in this case because we
// just want to drop the data, and only the capacity matters.
let string_storage = Vec::from_raw_parts(pointer, capacity, capacity);
drop(string_storage);
(*handler).identifier.drop();
}
}
......@@ -330,34 +252,153 @@ impl FfiRpcHandler {
Ok(result)
}
}
/// **For internal use**.
///
/// Use [`RouteBuilder`] instead.
///
/// This struct stores an RPC route identifier in the following packed format:
/// `{plugin}.{service}{path}{version}`. This format allows for efficient
/// extraction of the RPC route identifier for purposes of logging (note that
/// version is not displayed), and also losslessly stores info about all the
/// parts of the idenifier.
///
/// This represnetation also adds a constraint on the maximum length of the
/// plugin name, service name and path, each one of them must not be longer than
/// 65535 bytes (which is obviously engough for anybody).
#[repr(C)]
#[derive(Debug, Default, Clone, Copy)]
pub struct PackedServiceIdentifier {
pub storage: FfiSafeStr,
pub plugin_len: u16,
pub service_len: u16,
pub path_len: u16,
pub version_len: u16,
}
impl PackedServiceIdentifier {
fn pack(path: &str, plugin: &str, service: &str, version: &str) -> Result<Self, BoxError> {
let Ok(plugin_len) = plugin.len().try_into() else {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, format!("plugin name length must not exceed 65535, got {}", plugin.len())));
};
let Ok(service_len) = service.len().try_into() else {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, format!("service name length must not exceed 65535, got {}", service.len())));
};
let Ok(path_len) = path.len().try_into() else {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, format!("route path length must not exceed 65535, got {}", path.len())));
};
let Ok(version_len) = version.len().try_into() else {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, format!("version string length must not exceed 65535, got {}", version.len())));
};
let total_string_len = plugin_len
// For an extra '.' between plugin and service names
+ 1
+ service_len
+ path_len
+ version_len;
let mut string_storage = Vec::with_capacity(total_string_len as _);
string_storage.extend_from_slice(plugin.as_bytes());
string_storage.push(b'.');
string_storage.extend_from_slice(service.as_bytes());
string_storage.extend_from_slice(path.as_bytes());
string_storage.extend_from_slice(version.as_bytes());
let start = string_storage.as_mut_ptr();
let capacity = string_storage.capacity();
// Safety: vec has an allocated buffer, so the pointer cannot be null.
// Also a concatenation of utf8 strings is always a utf8 string.
let storage =
unsafe { FfiSafeStr::from_raw_parts(NonNull::new_unchecked(start), capacity) };
// Self now owns this data and will be freeing it in it's `drop`.
std::mem::forget(string_storage);
Ok(Self {
storage,
plugin_len,
service_len,
path_len,
version_len,
})
}
#[allow(unreachable_code)]
pub(crate) fn drop(&mut self) {
let (pointer, capacity) = self.storage.into_raw_parts();
if capacity == 0 {
#[cfg(debug_assertions)]
unreachable!("drop should only be called once");
return;
}
// Note: we pretend the original Vec was filled to capacity which
// may or may not be true, there might have been some unitialized
// data at the end. But this doesn't matter in this case because we
// just want to drop the data, and only the capacity matters.
// Safety: safe because drop only happens once and the next time the
// pointer will be set to null.
unsafe {
let string_storage = Vec::from_raw_parts(pointer, capacity, capacity);
drop(string_storage);
}
// Overwrite with len = 0, to guard against double free
self.storage = FfiSafeStr::from("");
}
#[inline(always)]
pub fn path(&self) -> &str {
fn storage_slice(&self, start: u16, len: u16) -> &str {
// SAFETY: data is alive for the lifetime of `&self`, and borrow checker does it's thing
unsafe { self.path.as_str() }
let storage = unsafe { self.storage.as_str() };
let end = (start + len) as usize;
&storage[start as usize..end]
}
#[inline(always)]
pub fn plugin(&self) -> &str {
// SAFETY: data is alive for the lifetime of `&self`, and borrow checker does it's thing
unsafe { self.plugin_name.as_str() }
self.storage_slice(0, self.plugin_len)
}
#[inline(always)]
pub fn service(&self) -> &str {
// SAFETY: data is alive for the lifetime of `&self`, and borrow checker does it's thing
unsafe { self.service_name.as_str() }
self.storage_slice(self.plugin_len + 1, self.service_len)
}
#[inline(always)]
pub fn path(&self) -> &str {
self.storage_slice(self.plugin_len + 1 + self.service_len, self.path_len)
}
#[inline(always)]
pub fn route_repr(&self) -> &str {
// SAFETY: data is alive for the lifetime of `&self`, and borrow checker does it's thing
unsafe { self.route_repr.as_str() }
self.storage_slice(0, self.plugin_len + 1 + self.service_len + self.path_len)
}
#[inline(always)]
pub fn version(&self) -> &str {
// SAFETY: data is alive for the lifetime of `&self`, and borrow checker does it's thing
unsafe { self.plugin_version.as_str() }
self.storage_slice(
self.plugin_len + 1 + self.service_len + self.path_len,
self.version_len,
)
}
}
impl std::fmt::Display for PackedServiceIdentifier {
#[inline(always)]
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"{}.{}:v{}{}",
self.plugin(),
self.service(),
self.version(),
self.path()
)
}
}
......@@ -68,17 +68,17 @@ pub fn proc_rpc_dispatch_impl(args: &RawBytes) -> Result<&'static RawBytes, TntE
// SAFETY: safe because it doesn't outlive `args`
let v_requestor = unsafe { context.plugin_version.as_str() };
if handler.version() != v_requestor {
let v_handler = handler.identifier.version();
if v_handler != v_requestor {
return Err(BoxError::new(ErrorCode::WrongPluginVersion, format!("RPC request to an endpoint `{plugin}.{service}{path}` with incompatible version (requestor: {v_requestor}, handler: {v_handler})",
plugin=key.plugin,
service=key.service,
v_handler=handler.version(),
)).into());
}
// TODO: check service is not poisoned
fiber::set_name(handler.route_repr());
fiber::set_name(handler.identifier.route_repr());
let output = handler
.call(input, &context)
.map_err(|()| BoxError::last())?;
......@@ -126,33 +126,34 @@ unsafe fn handlers_mut() -> &'static mut RpcHandlerMap {
}
pub fn register_rpc_handler(handler: FfiRpcHandler) -> Result<(), BoxError> {
if handler.path().is_empty() {
let identifier = &handler.identifier;
if identifier.path().is_empty() {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "RPC route path cannot be empty"));
} else if !handler.path().starts_with('/') {
} else if !identifier.path().starts_with('/') {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, format!("RPC route path must start with '/', got '{}'", handler.path())));
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, format!("RPC route path must start with '/', got '{}'", identifier.path())));
}
if handler.plugin().is_empty() {
if identifier.plugin().is_empty() {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "RPC route plugin name cannot be empty"));
}
if handler.service().is_empty() {
if identifier.service().is_empty() {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "RPC route service name cannot be empty"));
}
if handler.version().is_empty() {
if identifier.version().is_empty() {
#[rustfmt::skip]
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "RPC route service version cannot be empty"));
}
let key = RpcHandlerKey {
plugin: handler.plugin(),
service: handler.service(),
path: handler.path(),
plugin: identifier.plugin(),
service: identifier.service(),
path: identifier.path(),
};
// SAFETY: this is safe as long as we never let users touch `RpcHandlerKey`,
......@@ -168,35 +169,23 @@ pub fn register_rpc_handler(handler: FfiRpcHandler) -> Result<(), BoxError> {
let key = e.key();
let old_handler = e.get();
let v_old = old_handler.identifier.version();
let v_new = handler.identifier.version();
#[rustfmt::skip]
if old_handler.version() != handler.version() {
let message = format!("RPC endpoint `{plugin}.{service}{path}` is already registered with a different version (old: {old_version}, new: {new_version})", plugin=key.plugin, service=key.service, path=key.path, old_version=old_handler.version(), new_version=handler.version());
if v_old != v_new {
let message = format!("RPC endpoint `{plugin}.{service}{path}` is already registered with a different version (old: {v_old}, new: {v_new})", plugin=key.plugin, service=key.service, path=key.path);
return Err(BoxError::new(TarantoolErrorCode::FunctionExists, message));
} else if old_handler.identity() != handler.identity() {
let message = format!("RPC endpoint `{plugin}.{service}:v{version}{path}` is already registered with a different handler", plugin = key.plugin, service = key.service, version = old_handler.version(), path = key.path);
let message = format!("RPC endpoint `{}` is already registered with a different handler", old_handler.identifier);
return Err(BoxError::new(TarantoolErrorCode::FunctionExists, message));
} else {
tlog!(
Info,
"RPC endpoint `{plugin}.{service}:v{version}{path}` is already registered",
plugin = handler.plugin(),
service = handler.service(),
version = handler.version(),
path = handler.path(),
);
tlog!(Info, "RPC endpoint `{}` is already registered", handler.identifier);
return Ok(());
};
}
};
tlog!(
Info,
"registered RPC endpoint `{}.{}:v{}{}`",
handler.plugin(),
handler.service(),
handler.version(),
handler.path(),
);
tlog!(Info, "registered RPC endpoint `{}`", handler.identifier);
entry.insert(Rc::new(handler));
Ok(())
}
......@@ -205,18 +194,11 @@ pub fn unregister_all_rpc_handlers(plugin_name: &str, service_name: &str, plugin
// SAFETY: safe because we don't leak any references to the stored data
let handlers = unsafe { handlers_mut() };
handlers.retain(|_, handler| {
let matches = handler.plugin() == plugin_name
&& handler.service() == service_name
&& handler.version() == plugin_version;
let matches = handler.identifier.plugin() == plugin_name
&& handler.identifier.service() == service_name
&& handler.identifier.version() == plugin_version;
if matches {
tlog!(
Info,
"unregistered RPC endpoint `{}.{}-v{}{}`",
handler.plugin(),
handler.service(),
handler.version(),
handler.path(),
);
tlog!(Info, "unregistered RPC endpoint `{}`", handler.identifier);
// Don't retain
false
} else {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment