diff --git a/src/audit.rs b/src/audit.rs index 028b5e193b93b042cd92236950b057a3ad9fbdc0..f32a455de47bee3b99b0db8a58a0eee2759f7487 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -1,6 +1,6 @@ use crate::traft::LogicalClock; -use once_cell::sync::OnceCell; use std::ffi::{CStr, CString}; +use std::sync::{Mutex, OnceLock}; use tarantool::{error::TarantoolError, log::SayLevel}; /// Tarantool's low-level APIs. @@ -299,15 +299,18 @@ impl slog::Drain for Log { } // Note: we don't want to expose these implementation details. -static ROOT: OnceCell<slog::Logger> = OnceCell::new(); -static mut CLOCK: OnceCell<LogicalClock> = OnceCell::new(); +static ROOT: OnceLock<slog::Logger> = OnceLock::new(); +static CLOCK: Mutex<Option<LogicalClock>> = Mutex::new(None); /// Generate next unique record id. fn next_unique_id() -> Option<LogicalClock> { - // SAFETY: we'll call this only from TX thread. - let clock = unsafe { CLOCK.get_mut()? }; - clock.inc(); - Some(*clock) + match &mut *CLOCK.lock().unwrap() { + None => None, + Some(clock) => { + clock.inc(); + Some(*clock) + } + } } /// A public log drain for the [`crate::audit!`] macro. @@ -388,13 +391,14 @@ macro_rules! audit( /// Note: `config` will be parsed by tarantool's core (see `say.c`). pub fn init(config: &str, raft_id: u64, raft_gen: u64) { // Note: this'll only fail if the cell's already set (shouldn't be possible). - // SAFETY: this is the first time we access this variable, and it's - // always done from the main (TX) thread. - unsafe { - CLOCK - .set(LogicalClock::new(raft_id, raft_gen)) - .expect("failed to initialize global audit event id generator"); - } + let old_clock = CLOCK + .lock() + .unwrap() + .replace(LogicalClock::new(raft_id, raft_gen)); + assert!( + old_clock.is_none(), + "global audit event id generator must be initialized only once" + ); let config = CString::new(config).expect("audit log config contains nul"); let log = Log::new(config).expect("failed to create audit log"); diff --git a/src/cas.rs b/src/cas.rs index 4974275eb94110540581b12089fc2c535c95b225..23f319fb5e3efe4edbcd38d89962edd882445915 100644 --- a/src/cas.rs +++ b/src/cas.rs @@ -16,6 +16,7 @@ use ::raft::prelude as raft; use ::raft::Error as RaftError; use ::raft::GetEntriesContext; use ::raft::StorageError; +use std::sync::OnceLock; use tarantool::error::Error as TntError; use tarantool::error::TarantoolErrorCode; use tarantool::fiber; @@ -676,11 +677,12 @@ pub fn check_predicate( continue; } let key_def = storage::cached_key_def_for_key(space, 0)?; - for key in schema_related_property_keys() { + let conflict_found = SCHEMA_RELATED_PROPERTY_KEYS.with(|keys| { // NOTE: this is just a string comparison - if range.contains(&key_def, key) { - return Err(Error::ConflictFound(entry_index)); - } + keys.iter().any(|key| range.contains(&key_def, key)) + }); + if conflict_found { + return Err(Error::ConflictFound(entry_index)); } } Op::Plugin { .. } => { @@ -689,9 +691,10 @@ pub fn check_predicate( continue; } let key_def = storage::cached_key_def_for_key(space, 0)?; - let key = pending_plugin_operation_key(); // NOTE: this is just a string comparison - if range.contains(&key_def, key) { + let conflict_found = + PENDING_PLUGIN_OPERATION.with(|key| range.contains(&key_def, key)); + if conflict_found { return Err(Error::ConflictFound(entry_index)); } } @@ -708,54 +711,28 @@ const SCHEMA_RELATED_PROPERTIES: [&str; 4] = [ crate::storage::PropertyName::NextSchemaVersion.as_str(), ]; -/// Returns a slice of tuples representing keys of space _pico_property which -/// should be used to check predicates of schema changing CaS operations. -fn schema_related_property_keys() -> &'static [Tuple] { - static mut DATA: Option<Vec<Tuple>> = None; - - // Safety: we only call this from tx thread, so it's ok, trust me - unsafe { - if DATA.is_none() { - let mut data = Vec::with_capacity(SCHEMA_RELATED_PROPERTIES.len()); - for key in SCHEMA_RELATED_PROPERTIES { - let t = Tuple::new(&(key,)).expect("keys should convert to tuple"); - data.push(t); - } - DATA = Some(data); - } +thread_local! { + /// A slice of tuples representing keys of space _pico_property which + /// should be used to check predicates of schema changing CaS operations. + static SCHEMA_RELATED_PROPERTY_KEYS: Vec<Tuple> = + SCHEMA_RELATED_PROPERTIES.iter() + .map(|key| Tuple::new(&(key,)).expect("keys should convert to tuple")) + .collect(); - DATA.as_ref().unwrap() - } -} - -fn pending_plugin_operation_key() -> &'static Tuple { - static mut TUPLE: Option<Tuple> = None; - // Safety: only called from main thread - unsafe { - TUPLE.get_or_insert_with(|| { - Tuple::new(&[storage::PropertyName::PendingPluginOperation]).expect("cannot fail") - }) - } + static PENDING_PLUGIN_OPERATION: Tuple = + Tuple::new(&[storage::PropertyName::PendingPluginOperation]).expect("cannot fail"); } /// Returns a slice of [`Range`] structs which are needed for the CaS /// request which performs a schema change operation. pub fn schema_change_ranges() -> &'static [Range] { - static mut DATA: Option<Vec<Range>> = None; - - // Safety: we only call this from tx thread, so it's ok, trust me - unsafe { - if DATA.is_none() { - let mut data = Vec::with_capacity(SCHEMA_RELATED_PROPERTIES.len()); - for key in SCHEMA_RELATED_PROPERTIES { - let r = Range::new(ClusterwideTable::Property).eq((key,)); - data.push(r); - } - DATA = Some(data); - } - - DATA.as_ref().unwrap() - } + static DATA: OnceLock<Vec<Range>> = OnceLock::new(); + DATA.get_or_init(|| { + SCHEMA_RELATED_PROPERTIES + .iter() + .map(|key| Range::new(ClusterwideTable::Property).eq((key,))) + .collect() + }) } /// Represents a lua table describing a [`Range`]. diff --git a/src/config.rs b/src/config.rs index bd282a3fa9d5310ac58124840d6cda1964d61887..b0a890624f3dc5ceb52d581aeffebadbc3b8d300 100644 --- a/src/config.rs +++ b/src/config.rs @@ -32,6 +32,7 @@ use std::path::Path; use std::path::PathBuf; use std::str::FromStr; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::OnceLock; use tarantool::log::SayLevel; use tarantool::tuple::Tuple; @@ -80,7 +81,7 @@ fn validate_args(args: &args::Run) -> Result<(), Error> { Ok(()) } -static mut GLOBAL_CONFIG: Option<Box<PicodataConfig>> = None; +static GLOBAL_CONFIG: OnceLock<Box<PicodataConfig>> = OnceLock::new(); impl PicodataConfig { pub fn init(args: args::Run) -> Result<&'static Self, Error> { @@ -156,24 +157,14 @@ Using configuration file '{args_path}'."); config.parameter_sources = parameter_sources; - // Safe, because we only initialize config once in a single thread. - let config_ref = unsafe { - assert!(GLOBAL_CONFIG.is_none()); - GLOBAL_CONFIG.insert(config) - }; - - Ok(config_ref) + Ok(GLOBAL_CONFIG.get_or_init(|| config)) } #[inline(always)] pub fn get() -> &'static Self { - // Safe, because we only mutate GLOBAL_CONFIG once and - // strictly before anybody calls this function. - unsafe { - GLOBAL_CONFIG - .as_ref() - .expect("shouldn't be called before config is initialized") - } + GLOBAL_CONFIG + .get() + .expect("shouldn't be called before config is initialized") } /// Generates the configuration with parameters set to the default values @@ -813,11 +804,9 @@ Using configuration file '{args_path}'."); // For the tests pub(crate) fn init_for_tests() { let config = Box::new(Self::with_defaults()); - // Safe, because we only initialize config once in a single thread. - unsafe { - assert!(GLOBAL_CONFIG.is_none()); - let _ = GLOBAL_CONFIG.insert(config); - }; + GLOBAL_CONFIG + .set(config) + .expect("global config should be initialized only once"); } } diff --git a/src/lib.rs b/src/lib.rs index a28f91cc2939631260edb7565d88f9b03c5c54fd..1dea6795f7abcc077fbf5e34a9ae40a7e4b97d47 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -289,9 +289,6 @@ fn start_webui() { /// Those are used for inter-instance communication /// (discovery, rpc, public proc api). fn init_handlers() { - plugin::rpc::server::init_handlers(); - plugin::metrics::init_handlers(); - rpc::init_static_proc_set(); let lua = ::tarantool::lua_state(); diff --git a/src/pico_service.rs b/src/pico_service.rs index 5dc1c115757b0c50f3e6faf5fa14a005f172ef8e..33f3238fc421ecf820922e4c905a9d5397493361 100644 --- a/src/pico_service.rs +++ b/src/pico_service.rs @@ -1,23 +1,26 @@ use crate::tlog; use crate::traft::error::Error; use crate::unwrap_ok_or; +use std::cell::RefCell; use std::fs::File; use std::io::Read; use std::os::unix::fs::PermissionsExt as _; use std::path::Path; +use std::rc::Rc; -/// Password of the special system user "pico_service". -/// -/// It is stored in a global variable, because we need to access it from -/// different places in code when creating iproto connections to other instances. -// TODO: for chap-sha authentication method we only need to store the sha1 hash -// of the password, but our iproto clients don't yet support this and also sha1 -// is not a secure hash anyway, so ... -static mut PICO_SERVICE_PASSWORD: Option<String> = None; +thread_local! { + /// Password of the special system user "pico_service". + /// + /// It is stored in a global variable, because we need to access it from + /// different places in code when creating iproto connections to other instances. + // TODO: for chap-sha authentication method we only need to store the sha1 hash + // of the password, but our iproto clients don't yet support this and also sha1 + // is not a secure hash anyway, so ... + static PICO_SERVICE_PASSWORD: RefCell<Rc<str>> = RefCell::new(Rc::from("")); +} -#[inline(always)] -pub(crate) fn pico_service_password() -> &'static str { - unsafe { PICO_SERVICE_PASSWORD.as_deref() }.unwrap_or("") +pub(crate) fn pico_service_password() -> Rc<str> { + PICO_SERVICE_PASSWORD.with_borrow(Rc::clone) } pub(crate) fn read_pico_service_password_from_file( @@ -69,9 +72,7 @@ pub(crate) fn read_pico_service_password_from_file( )); } - unsafe { - PICO_SERVICE_PASSWORD = Some(password.into()); - } + PICO_SERVICE_PASSWORD.set(Rc::from(password)); Ok(()) } diff --git a/src/plugin/metrics.rs b/src/plugin/metrics.rs index d9df9cfb95a673d40ec3c3c89f7a10c19fbd4095..27794b72957dc6997adc7dc9581c220813465936 100644 --- a/src/plugin/metrics.rs +++ b/src/plugin/metrics.rs @@ -2,31 +2,18 @@ use crate::tlog; use picodata_plugin::metrics::FfiMetricsHandler; use picodata_plugin::plugin::interface::ServiceId; use picodata_plugin::util::RegionGuard; +use std::cell::RefCell; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::rc::Rc; use tarantool::error::BoxError; use tarantool::error::TarantoolErrorCode; -use tarantool::fiber::mutex::MutexGuard; -use tarantool::fiber::Mutex; -static mut HANDLERS: Option<Mutex<MetricsHandlerMap>> = None; - -type MetricsHandlerMap = HashMap<ServiceId, Rc<FfiMetricsHandler>>; - -pub(crate) fn init_handlers() { - unsafe { - HANDLERS = Some(Mutex::new(HashMap::new())); - } +thread_local! { + static HANDLERS: RefCell<MetricsHandlerMap> = RefCell::new(MetricsHandlerMap::new()); } -#[inline] -fn handlers() -> MutexGuard<'static, MetricsHandlerMap> { - // SAFETY: global variable access: safe in tx thread. - let handlers = unsafe { HANDLERS.as_ref() }; - let handlers = handlers.expect("should be initialized at startup"); - handlers.lock() -} +type MetricsHandlerMap = HashMap<ServiceId, Rc<FfiMetricsHandler>>; pub fn register_metrics_handler(handler: FfiMetricsHandler) -> Result<(), BoxError> { let identifier = &handler.identifier; @@ -50,35 +37,42 @@ pub fn register_metrics_handler(handler: FfiMetricsHandler) -> Result<(), BoxErr return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "RPC route service version cannot be empty")); } - let mut handlers = handlers(); let service_id = identifier.service_id(); - let entry = handlers.entry(service_id); - let entry = match entry { - Entry::Vacant(e) => e, - Entry::Occupied(e) => { - let service_id = e.key(); - let old_handler = e.get(); - #[rustfmt::skip] - if old_handler.identity() != handler.identity() { - let message = format!("metrics handler for `{service_id}` is already registered with a different handler"); - return Err(BoxError::new(TarantoolErrorCode::FunctionExists, message)); - } else { - tlog!(Info, "metrics handler `{service_id}` is already registered"); - return Ok(()); - }; + HANDLERS.with_borrow_mut(|handlers| { + match handlers.entry(service_id) { + Entry::Vacant(e) => { + let service_id = e.key(); + tlog!(Info, "registered metrics handler for `{service_id}`"); + e.insert(Rc::new(handler)); + Ok(()) + } + + Entry::Occupied(e) => { + let service_id = e.key(); + let old_handler = e.get(); + // Note: inequality of function pointer addresses doesn't mean that the functions + // are actually different. The same function may be instantiated in different + // codegen units independently at different addresses. There is no guarantee + // whether the resulting functions would be de-duplicated or kept separate. + if old_handler.identity() != handler.identity() { + let message = format!( + "metrics handler for `{service_id}` is already registered, \ + with a (possibly) different handler" + ); + Err(BoxError::new(TarantoolErrorCode::FunctionExists, message)) + } else { + tlog!(Info, "metrics handler `{service_id}` is already registered"); + Ok(()) + } + } } - }; - - let service_id = entry.key(); - tlog!(Info, "registered metrics handler for `{service_id}`"); - entry.insert(Rc::new(handler)); - Ok(()) + }) } pub fn unregister_metrics_handler(service_id: &ServiceId) { // SAFETY: global variable access: safe in tx thread. - let handler = handlers().remove(service_id); + let handler = HANDLERS.with_borrow_mut(|handlers| handlers.remove(service_id)); if handler.is_some() { tlog!(Info, "unregistered metrics handler for `{service_id}`"); } @@ -89,11 +83,9 @@ pub fn get_plugin_metrics() -> String { let mut res = String::new(); - let handlers = handlers(); // Note: must first copy all references to a local vec, because the callbacks may yield. - let handler_copies: Vec<_> = handlers.values().cloned().collect(); - // Release the lock. - drop(handlers); + let handler_copies: Vec<_> = + HANDLERS.with_borrow(|handlers| handlers.values().cloned().collect()); for handler in handler_copies { let Ok(data) = handler.call() else { diff --git a/src/plugin/rpc/server.rs b/src/plugin/rpc/server.rs index 7c9127a6f4b8d1c98ece5ca397eac2dc416e53d5..8d26ba4bf5c93c0b7ee31c04bd04617a70c7b0aa 100644 --- a/src/plugin/rpc/server.rs +++ b/src/plugin/rpc/server.rs @@ -3,6 +3,7 @@ use crate::tlog; use picodata_plugin::transport::context::FfiSafeContext; use picodata_plugin::transport::rpc::server::FfiRpcHandler; use picodata_plugin::util::RegionBuffer; +use std::cell::RefCell; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::rc::Rc; @@ -54,11 +55,7 @@ pub fn proc_rpc_dispatch_impl(args: &RawBytes) -> Result<&'static RawBytes, TntE } }; - // SAFETY: safe because - // - keys don't leak - // - we don't hold on to a reference to the global data, and other fibers - // may safely mutate the HANDLERS hashmap. - let maybe_handler = unsafe { handlers_mut().get(&key).cloned() }; + let maybe_handler = HANDLERS.with_borrow(|handlers| handlers.get(&key).cloned()); let Some(handler) = maybe_handler else { #[rustfmt::skip] @@ -104,7 +101,9 @@ pub fn proc_rpc_dispatch(args: &RawBytes) -> Result<&'static RawBytes, TntError> // handler storage //////////////////////////////////////////////////////////////////////////////// -static mut HANDLERS: Option<RpcHandlerMap> = None; +thread_local! { + static HANDLERS: RefCell<RpcHandlerMap> = RefCell::new(RpcHandlerMap::new()); +} type RpcHandlerMap = HashMap<RpcHandlerKey<'static>, Rc<FfiRpcHandler>>; @@ -115,18 +114,8 @@ struct RpcHandlerKey<'a> { path: &'a str, } -pub(crate) fn init_handlers() { - unsafe { - HANDLERS = Some(HashMap::new()); - } -} - -unsafe fn handlers_mut() -> &'static mut RpcHandlerMap { - HANDLERS.as_mut().expect("should be initialized at startup") -} - pub fn register_rpc_handler(handler: FfiRpcHandler) -> Result<(), BoxError> { - let identifier = &handler.identifier; + let identifier = handler.identifier; if identifier.path().is_empty() { #[rustfmt::skip] return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "RPC route path cannot be empty")); @@ -156,56 +145,68 @@ pub fn register_rpc_handler(handler: FfiRpcHandler) -> Result<(), BoxError> { path: identifier.path(), }; - // SAFETY: this is safe as long as we never let users touch `RpcHandlerKey`, - // it must not outlive the `handler`, which should be fine because they're - // stored together in the hash map. - let entry = unsafe { - handlers_mut().entry(std::mem::transmute::<RpcHandlerKey, RpcHandlerKey<'static>>(key)) - }; - - let entry = match entry { - Entry::Vacant(e) => e, - Entry::Occupied(e) => { - let key = e.key(); - let old_handler = e.get(); - - let v_old = old_handler.identifier.version(); - let v_new = handler.identifier.version(); - #[rustfmt::skip] - if v_old != v_new { - let message = format!("RPC endpoint `{plugin}.{service}{path}` is already registered with a different version (old: {v_old}, new: {v_new})", plugin=key.plugin, service=key.service, path=key.path); - return Err(BoxError::new(TarantoolErrorCode::FunctionExists, message)); - } else if old_handler.identity() != handler.identity() { - let message = format!("RPC endpoint `{}` is already registered with a different handler", old_handler.identifier); - return Err(BoxError::new(TarantoolErrorCode::FunctionExists, message)); - } else { - tlog!(Info, "RPC endpoint `{}` is already registered", handler.identifier); - return Ok(()); - }; + HANDLERS.with_borrow_mut(|handlers| { + // SAFETY: this is safe as long as we never let users touch `RpcHandlerKey`, + // it must not outlive the `handler`, which should be fine because they're + // stored together in the hash map. + let handler_key = + unsafe { std::mem::transmute::<RpcHandlerKey<'_>, RpcHandlerKey<'static>>(key) }; + match handlers.entry(handler_key) { + Entry::Vacant(e) => { + tlog!(Info, "registered RPC endpoint `{}`", handler.identifier); + e.insert(Rc::new(handler)); + Ok(()) + } + Entry::Occupied(e) => { + let key = e.key(); + let old_handler = e.get(); + + let v_old = old_handler.identifier.version(); + let v_new = handler.identifier.version(); + if v_old != v_new { + let message = format!( + "RPC endpoint `{plugin}.{service}{path}` is already registered with a \ + different version (old: {v_old}, new: {v_new})", + plugin = key.plugin, + service = key.service, + path = key.path, + ); + Err(BoxError::new(TarantoolErrorCode::FunctionExists, message)) + } else if old_handler.identity() != handler.identity() { + let message = format!( + "RPC endpoint `{}` is already registered with a different handler", + old_handler.identifier, + ); + Err(BoxError::new(TarantoolErrorCode::FunctionExists, message)) + } else { + tlog!( + Info, + "RPC endpoint `{}` is already registered", + handler.identifier + ); + Ok(()) + } + } } - }; - - tlog!(Info, "registered RPC endpoint `{}`", handler.identifier); - entry.insert(Rc::new(handler)); - Ok(()) + }) } pub fn unregister_all_rpc_handlers(plugin_name: &str, service_name: &str, plugin_version: &str) { - // SAFETY: safe because we don't leak any references to the stored data - let handlers = unsafe { handlers_mut() }; - handlers.retain(|_, handler| { - let matches = handler.identifier.plugin() == plugin_name - && handler.identifier.service() == service_name - && handler.identifier.version() == plugin_version; - if matches { - tlog!(Info, "unregistered RPC endpoint `{}`", handler.identifier); - // Don't retain - false - } else { - // Do retain - true - } - }) + HANDLERS.with_borrow_mut(|handlers| { + handlers.retain(|_, handler| { + let matches = handler.identifier.plugin() == plugin_name + && handler.identifier.service() == service_name + && handler.identifier.version() == plugin_version; + if matches { + tlog!(Info, "unregistered RPC endpoint `{}`", handler.identifier); + // Don't retain + false + } else { + // Do retain + true + } + }) + }); } //////////////////////////////////////////////////////////////////////////////// @@ -271,8 +272,6 @@ mod test { #[tarantool::test] fn rpc_handler_no_use_after_free() { - init_handlers(); - let plugin_name = "plugin"; let service_name = "service"; let plugin_version = "3.14.78-rc37.2"; diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index fef0ab2719bd6a11ea82810451c518d79c3de25c..7bca618e6136570ad53573de56369e0c6c92ac18 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -23,6 +23,7 @@ use std::io; use serde::de::DeserializeOwned; use std::collections::HashSet; +use std::sync::OnceLock; pub mod ddl_apply; pub mod disable_service; @@ -37,8 +38,6 @@ pub mod sharding; pub mod snapshot; pub mod update_instance; -static mut STATIC_PROCS: Option<HashSet<String>> = None; - pub fn replicasets_masters<'a>( replicasets: &HashMap<&ReplicasetName, &'a Replicaset>, instances: &'a [Instance], @@ -68,31 +67,27 @@ pub fn replicasets_masters<'a>( masters } +// TODO: Change this into LazyLock once MSRV is big enough +// This will allow to remove separate initialization step, and possible panics. +static STATIC_PROCS: OnceLock<HashSet<String>> = OnceLock::new(); + #[inline(always)] pub fn init_static_proc_set() { - let mut map = HashSet::new(); - for proc in ::tarantool::proc::all_procs().iter() { - map.insert(format!(".{}", proc.name())); - } - - // Safety: safe as long as only called from tx thread - unsafe { - assert!(STATIC_PROCS.is_none()); - STATIC_PROCS = Some(map); - } + let all_procs = tarantool::proc::all_procs() + .iter() + .map(|proc| format!(".{}", proc.name())) + .collect(); + STATIC_PROCS + .set(all_procs) + .expect("proc list must be initialized once at startup"); } #[inline(always)] pub fn to_static_proc_name(name: &str) -> Option<&'static str> { - // Safety: safe as long as only called from tx thread - let name_ref = unsafe { - STATIC_PROCS - .as_ref() - .expect("should be initialized at startup") - .get(name)? - }; - - Some(name_ref) + let procs = STATIC_PROCS + .get() + .expect("should be initialized at startup"); + procs.get(name).map(String::as_str) } /// Types implementing this trait represent an RPC's (remote procedure call) @@ -165,7 +160,7 @@ where let mut config = Config::default(); config.creds = Some(( PICO_SERVICE_USER_NAME.into(), - pico_service_password().into(), + pico_service_password().as_ref().into(), )); let client = Client::connect_with_config(address, port, config).await?; diff --git a/src/schema.rs b/src/schema.rs index e9eb85148a2089fed21cbb7efa3b0bd98910c335..6e008a0175f3c08af66ed1539e32bee017123649 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -1324,7 +1324,7 @@ pub fn system_user_definitions() -> Vec<(UserDef, Vec<PrivilegeDef>)> { AuthData::new( &AuthMethod::ChapSha1, PICO_SERVICE_USER_NAME, - pico_service_password(), + pico_service_password().as_ref(), ) .into_string(), )), diff --git a/src/storage.rs b/src/storage.rs index bb7ff3faf286a5501fd8aee25adaa9381a021523..765251d73753f9757a0741a5f2821037938ff2cd 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -46,7 +46,7 @@ use crate::util::Uppercase; use rmpv::Utf8String; use std::borrow::Cow; -use std::cell::RefCell; +use std::cell::{Cell, RefCell}; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; @@ -595,27 +595,27 @@ fn cached_key_def_impl( let id = (space_id, index_id, kind); - static mut SYSTEM_KEY_DEFS: Option<KeyDefCache> = None; - // Safety: this is only called from main thread - let system_key_defs = unsafe { SYSTEM_KEY_DEFS.get_or_insert_with(HashMap::new) }; + thread_local! { + static SYSTEM_KEY_DEFS: RefCell<KeyDefCache> = RefCell::new(KeyDefCache::new()); + static USER_KEY_DEFS: RefCell<KeyDefCache> = RefCell::new(KeyDefCache::new()); + static SCHEMA_VERSION: Cell<u64> = Cell::new(0); + } + if ClusterwideTable::try_from(space_id).is_ok() { // System table definition's never change during a single // execution, so it's safe to cache these - let key_def = get_or_create_key_def(system_key_defs, id)?; + let key_def = + SYSTEM_KEY_DEFS.with_borrow_mut(|key_defs| get_or_create_key_def(key_defs, id))?; return Ok(key_def); } - static mut USER_KEY_DEFS: Option<(u64, KeyDefCache)> = None; - let (schema_version, user_key_defs) = - // Safety: this is only called from main thread - unsafe { USER_KEY_DEFS.get_or_insert_with(|| (0, HashMap::new())) }; let box_schema_version = box_schema_version(); - if *schema_version != box_schema_version { - user_key_defs.clear(); - *schema_version = box_schema_version; + if SCHEMA_VERSION.get() != box_schema_version { + USER_KEY_DEFS.with_borrow_mut(|key_defs| key_defs.clear()); + SCHEMA_VERSION.set(box_schema_version); } - let key_def = get_or_create_key_def(user_key_defs, id)?; + let key_def = USER_KEY_DEFS.with_borrow_mut(|key_defs| get_or_create_key_def(key_defs, id))?; Ok(key_def) } diff --git a/src/traft/network.rs b/src/traft/network.rs index d74415ae42680869b243d2e433d13147cd8c6d2a..cab7c8802f5493b00f5f922a439466cedd3c376d 100644 --- a/src/traft/network.rs +++ b/src/traft/network.rs @@ -197,7 +197,7 @@ impl PoolWorker { let mut config = Config::default(); config.creds = Some(( PICO_SERVICE_USER_NAME.into(), - pico_service_password().into(), + pico_service_password().as_ref().into(), )); config.connect_timeout = Some(call_timeout); let client = ReconnClient::with_config(address.clone(), port, config); diff --git a/test/testplug/src/lib.rs b/test/testplug/src/lib.rs index 85ac63c22aec6cb555562f98ee5386232cfd5f05..8d8d83f713b802e93b8acdadb737d9e285de9cac 100644 --- a/test/testplug/src/lib.rs +++ b/test/testplug/src/lib.rs @@ -442,7 +442,11 @@ impl Service for Service3 { let e = ctx .register_metrics_callback(|| "other_metrics 69".into()) .unwrap_err(); - assert_eq!(e.to_string(), "FunctionExists: metrics handler for `testplug_sdk.testservice_3:v0.1.0` is already registered with a different handler"); + assert_eq!( + e.to_string(), + "FunctionExists: metrics handler for `testplug_sdk.testservice_3:v0.1.0` \ + is already registered, with a (possibly) different handler", + ); #[derive(Clone)] struct DropCheck;