From a2bfc8c98d396b5df2ddbf093c4f627a330a9dbb Mon Sep 17 00:00:00 2001 From: Anton Fetisov <a.fetisov@picodata.io> Date: Thu, 6 Feb 2025 15:43:14 +0300 Subject: [PATCH] refactor: change metric&rpc handlers to use thread locals instead of static mut --- src/lib.rs | 3 - src/plugin/metrics.rs | 78 +++++++++++------------ src/plugin/rpc/server.rs | 129 +++++++++++++++++++-------------------- test/testplug/src/lib.rs | 6 +- 4 files changed, 104 insertions(+), 112 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index a28f91cc29..1dea6795f7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -289,9 +289,6 @@ fn start_webui() { /// Those are used for inter-instance communication /// (discovery, rpc, public proc api). fn init_handlers() { - plugin::rpc::server::init_handlers(); - plugin::metrics::init_handlers(); - rpc::init_static_proc_set(); let lua = ::tarantool::lua_state(); diff --git a/src/plugin/metrics.rs b/src/plugin/metrics.rs index d9df9cfb95..27794b7295 100644 --- a/src/plugin/metrics.rs +++ b/src/plugin/metrics.rs @@ -2,31 +2,18 @@ use crate::tlog; use picodata_plugin::metrics::FfiMetricsHandler; use picodata_plugin::plugin::interface::ServiceId; use picodata_plugin::util::RegionGuard; +use std::cell::RefCell; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::rc::Rc; use tarantool::error::BoxError; use tarantool::error::TarantoolErrorCode; -use tarantool::fiber::mutex::MutexGuard; -use tarantool::fiber::Mutex; -static mut HANDLERS: Option<Mutex<MetricsHandlerMap>> = None; - -type MetricsHandlerMap = HashMap<ServiceId, Rc<FfiMetricsHandler>>; - -pub(crate) fn init_handlers() { - unsafe { - HANDLERS = Some(Mutex::new(HashMap::new())); - } +thread_local! { + static HANDLERS: RefCell<MetricsHandlerMap> = RefCell::new(MetricsHandlerMap::new()); } -#[inline] -fn handlers() -> MutexGuard<'static, MetricsHandlerMap> { - // SAFETY: global variable access: safe in tx thread. - let handlers = unsafe { HANDLERS.as_ref() }; - let handlers = handlers.expect("should be initialized at startup"); - handlers.lock() -} +type MetricsHandlerMap = HashMap<ServiceId, Rc<FfiMetricsHandler>>; pub fn register_metrics_handler(handler: FfiMetricsHandler) -> Result<(), BoxError> { let identifier = &handler.identifier; @@ -50,35 +37,42 @@ pub fn register_metrics_handler(handler: FfiMetricsHandler) -> Result<(), BoxErr return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "RPC route service version cannot be empty")); } - let mut handlers = handlers(); let service_id = identifier.service_id(); - let entry = handlers.entry(service_id); - let entry = match entry { - Entry::Vacant(e) => e, - Entry::Occupied(e) => { - let service_id = e.key(); - let old_handler = e.get(); - #[rustfmt::skip] - if old_handler.identity() != handler.identity() { - let message = format!("metrics handler for `{service_id}` is already registered with a different handler"); - return Err(BoxError::new(TarantoolErrorCode::FunctionExists, message)); - } else { - tlog!(Info, "metrics handler `{service_id}` is already registered"); - return Ok(()); - }; + HANDLERS.with_borrow_mut(|handlers| { + match handlers.entry(service_id) { + Entry::Vacant(e) => { + let service_id = e.key(); + tlog!(Info, "registered metrics handler for `{service_id}`"); + e.insert(Rc::new(handler)); + Ok(()) + } + + Entry::Occupied(e) => { + let service_id = e.key(); + let old_handler = e.get(); + // Note: inequality of function pointer addresses doesn't mean that the functions + // are actually different. The same function may be instantiated in different + // codegen units independently at different addresses. There is no guarantee + // whether the resulting functions would be de-duplicated or kept separate. + if old_handler.identity() != handler.identity() { + let message = format!( + "metrics handler for `{service_id}` is already registered, \ + with a (possibly) different handler" + ); + Err(BoxError::new(TarantoolErrorCode::FunctionExists, message)) + } else { + tlog!(Info, "metrics handler `{service_id}` is already registered"); + Ok(()) + } + } } - }; - - let service_id = entry.key(); - tlog!(Info, "registered metrics handler for `{service_id}`"); - entry.insert(Rc::new(handler)); - Ok(()) + }) } pub fn unregister_metrics_handler(service_id: &ServiceId) { // SAFETY: global variable access: safe in tx thread. - let handler = handlers().remove(service_id); + let handler = HANDLERS.with_borrow_mut(|handlers| handlers.remove(service_id)); if handler.is_some() { tlog!(Info, "unregistered metrics handler for `{service_id}`"); } @@ -89,11 +83,9 @@ pub fn get_plugin_metrics() -> String { let mut res = String::new(); - let handlers = handlers(); // Note: must first copy all references to a local vec, because the callbacks may yield. - let handler_copies: Vec<_> = handlers.values().cloned().collect(); - // Release the lock. - drop(handlers); + let handler_copies: Vec<_> = + HANDLERS.with_borrow(|handlers| handlers.values().cloned().collect()); for handler in handler_copies { let Ok(data) = handler.call() else { diff --git a/src/plugin/rpc/server.rs b/src/plugin/rpc/server.rs index 7c9127a6f4..8d26ba4bf5 100644 --- a/src/plugin/rpc/server.rs +++ b/src/plugin/rpc/server.rs @@ -3,6 +3,7 @@ use crate::tlog; use picodata_plugin::transport::context::FfiSafeContext; use picodata_plugin::transport::rpc::server::FfiRpcHandler; use picodata_plugin::util::RegionBuffer; +use std::cell::RefCell; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::rc::Rc; @@ -54,11 +55,7 @@ pub fn proc_rpc_dispatch_impl(args: &RawBytes) -> Result<&'static RawBytes, TntE } }; - // SAFETY: safe because - // - keys don't leak - // - we don't hold on to a reference to the global data, and other fibers - // may safely mutate the HANDLERS hashmap. - let maybe_handler = unsafe { handlers_mut().get(&key).cloned() }; + let maybe_handler = HANDLERS.with_borrow(|handlers| handlers.get(&key).cloned()); let Some(handler) = maybe_handler else { #[rustfmt::skip] @@ -104,7 +101,9 @@ pub fn proc_rpc_dispatch(args: &RawBytes) -> Result<&'static RawBytes, TntError> // handler storage //////////////////////////////////////////////////////////////////////////////// -static mut HANDLERS: Option<RpcHandlerMap> = None; +thread_local! { + static HANDLERS: RefCell<RpcHandlerMap> = RefCell::new(RpcHandlerMap::new()); +} type RpcHandlerMap = HashMap<RpcHandlerKey<'static>, Rc<FfiRpcHandler>>; @@ -115,18 +114,8 @@ struct RpcHandlerKey<'a> { path: &'a str, } -pub(crate) fn init_handlers() { - unsafe { - HANDLERS = Some(HashMap::new()); - } -} - -unsafe fn handlers_mut() -> &'static mut RpcHandlerMap { - HANDLERS.as_mut().expect("should be initialized at startup") -} - pub fn register_rpc_handler(handler: FfiRpcHandler) -> Result<(), BoxError> { - let identifier = &handler.identifier; + let identifier = handler.identifier; if identifier.path().is_empty() { #[rustfmt::skip] return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "RPC route path cannot be empty")); @@ -156,56 +145,68 @@ pub fn register_rpc_handler(handler: FfiRpcHandler) -> Result<(), BoxError> { path: identifier.path(), }; - // SAFETY: this is safe as long as we never let users touch `RpcHandlerKey`, - // it must not outlive the `handler`, which should be fine because they're - // stored together in the hash map. - let entry = unsafe { - handlers_mut().entry(std::mem::transmute::<RpcHandlerKey, RpcHandlerKey<'static>>(key)) - }; - - let entry = match entry { - Entry::Vacant(e) => e, - Entry::Occupied(e) => { - let key = e.key(); - let old_handler = e.get(); - - let v_old = old_handler.identifier.version(); - let v_new = handler.identifier.version(); - #[rustfmt::skip] - if v_old != v_new { - let message = format!("RPC endpoint `{plugin}.{service}{path}` is already registered with a different version (old: {v_old}, new: {v_new})", plugin=key.plugin, service=key.service, path=key.path); - return Err(BoxError::new(TarantoolErrorCode::FunctionExists, message)); - } else if old_handler.identity() != handler.identity() { - let message = format!("RPC endpoint `{}` is already registered with a different handler", old_handler.identifier); - return Err(BoxError::new(TarantoolErrorCode::FunctionExists, message)); - } else { - tlog!(Info, "RPC endpoint `{}` is already registered", handler.identifier); - return Ok(()); - }; + HANDLERS.with_borrow_mut(|handlers| { + // SAFETY: this is safe as long as we never let users touch `RpcHandlerKey`, + // it must not outlive the `handler`, which should be fine because they're + // stored together in the hash map. + let handler_key = + unsafe { std::mem::transmute::<RpcHandlerKey<'_>, RpcHandlerKey<'static>>(key) }; + match handlers.entry(handler_key) { + Entry::Vacant(e) => { + tlog!(Info, "registered RPC endpoint `{}`", handler.identifier); + e.insert(Rc::new(handler)); + Ok(()) + } + Entry::Occupied(e) => { + let key = e.key(); + let old_handler = e.get(); + + let v_old = old_handler.identifier.version(); + let v_new = handler.identifier.version(); + if v_old != v_new { + let message = format!( + "RPC endpoint `{plugin}.{service}{path}` is already registered with a \ + different version (old: {v_old}, new: {v_new})", + plugin = key.plugin, + service = key.service, + path = key.path, + ); + Err(BoxError::new(TarantoolErrorCode::FunctionExists, message)) + } else if old_handler.identity() != handler.identity() { + let message = format!( + "RPC endpoint `{}` is already registered with a different handler", + old_handler.identifier, + ); + Err(BoxError::new(TarantoolErrorCode::FunctionExists, message)) + } else { + tlog!( + Info, + "RPC endpoint `{}` is already registered", + handler.identifier + ); + Ok(()) + } + } } - }; - - tlog!(Info, "registered RPC endpoint `{}`", handler.identifier); - entry.insert(Rc::new(handler)); - Ok(()) + }) } pub fn unregister_all_rpc_handlers(plugin_name: &str, service_name: &str, plugin_version: &str) { - // SAFETY: safe because we don't leak any references to the stored data - let handlers = unsafe { handlers_mut() }; - handlers.retain(|_, handler| { - let matches = handler.identifier.plugin() == plugin_name - && handler.identifier.service() == service_name - && handler.identifier.version() == plugin_version; - if matches { - tlog!(Info, "unregistered RPC endpoint `{}`", handler.identifier); - // Don't retain - false - } else { - // Do retain - true - } - }) + HANDLERS.with_borrow_mut(|handlers| { + handlers.retain(|_, handler| { + let matches = handler.identifier.plugin() == plugin_name + && handler.identifier.service() == service_name + && handler.identifier.version() == plugin_version; + if matches { + tlog!(Info, "unregistered RPC endpoint `{}`", handler.identifier); + // Don't retain + false + } else { + // Do retain + true + } + }) + }); } //////////////////////////////////////////////////////////////////////////////// @@ -271,8 +272,6 @@ mod test { #[tarantool::test] fn rpc_handler_no_use_after_free() { - init_handlers(); - let plugin_name = "plugin"; let service_name = "service"; let plugin_version = "3.14.78-rc37.2"; diff --git a/test/testplug/src/lib.rs b/test/testplug/src/lib.rs index 85ac63c22a..8d8d83f713 100644 --- a/test/testplug/src/lib.rs +++ b/test/testplug/src/lib.rs @@ -442,7 +442,11 @@ impl Service for Service3 { let e = ctx .register_metrics_callback(|| "other_metrics 69".into()) .unwrap_err(); - assert_eq!(e.to_string(), "FunctionExists: metrics handler for `testplug_sdk.testservice_3:v0.1.0` is already registered with a different handler"); + assert_eq!( + e.to_string(), + "FunctionExists: metrics handler for `testplug_sdk.testservice_3:v0.1.0` \ + is already registered, with a (possibly) different handler", + ); #[derive(Clone)] struct DropCheck; -- GitLab