From 6fbaa26daab18c13adecbf595305bbf9714291f0 Mon Sep 17 00:00:00 2001 From: Fedor Telnov <f.telnov@picodata.io> Date: Wed, 24 Apr 2024 14:57:58 +0300 Subject: [PATCH] fix: move to tarantool Mutex in sbroad It syncs API with new sbroad's one. For more context refer to sbroad!424 --- sbroad | 2 +- src/sql.rs | 8 +++---- src/sql/router.rs | 55 ++++++++++++++++------------------------------- 3 files changed, 22 insertions(+), 43 deletions(-) diff --git a/sbroad b/sbroad index 0c57585af4..0d6f5acf57 160000 --- a/sbroad +++ b/sbroad @@ -1 +1 @@ -Subproject commit 0c57585af4fbfe449b0c0fdcdd56a3d84d766169 +Subproject commit 0d6f5acf57c41ff0f0e194057625b3183e0030d7 diff --git a/src/sql.rs b/src/sql.rs index ba6ee79839..cf204a7709 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -57,6 +57,7 @@ use ::tarantool::session::{with_su, UserId}; use ::tarantool::space::{FieldType, Space, SpaceId, SystemSpace}; use ::tarantool::time::Instant; use ::tarantool::tuple::{RawBytes, Tuple}; +use sbroad::utils::MutexLike; use std::rc::Rc; use std::str::FromStr; use tarantool::session; @@ -518,10 +519,7 @@ pub fn proc_pg_parse( &sql.clone(), || { let runtime = RouterRuntime::new().map_err(Error::from)?; - let mut cache = runtime - .cache() - .try_borrow_mut() - .map_err(|e| Error::Other(format!("runtime query cache: {e:?}").into()))?; + let mut cache = runtime.cache().lock(); if let Some(plan) = cache.get(&query.to_smolstr())? { let statement = Statement::new(id.to_string(), sql.clone(), plan.clone(), param_oids)?; @@ -529,7 +527,7 @@ pub fn proc_pg_parse( .with(|cache| cache.borrow_mut().put((cid, name.into()), statement))?; return Ok(()); } - let metadata = &*runtime.metadata().map_err(Error::from)?; + let metadata = &*runtime.metadata().lock(); let plan = with_su(ADMIN_ID, || -> traft::Result<IrPlan> { let mut plan = <RouterRuntime as Router>::ParseTree::transform_into_plan(&query, metadata) diff --git a/src/sql/router.rs b/src/sql/router.rs index 0e3c3bea4e..bee6cf2894 100644 --- a/src/sql/router.rs +++ b/src/sql/router.rs @@ -16,10 +16,12 @@ use sbroad::executor::protocol::Binary; use sbroad::frontend::sql::ast::AbstractSyntaxTree; use sbroad::ir::value::{MsgPackValue, Value}; use sbroad::ir::Plan; +use sbroad::utils::MutexLike; use smol_str::{format_smolstr, SmolStr, ToSmolStr}; +use tarantool::fiber::Mutex; use std::any::Any; -use std::cell::{Ref, RefCell}; + use std::collections::HashMap; use std::rc::Rc; @@ -50,8 +52,8 @@ use ::tarantool::util::Value as TarantoolValue; pub type VersionMap = HashMap<SmolStr, u64>; thread_local! { - static PLAN_CACHE: Rc<RefCell<PicoRouterCache >> = Rc::new( - RefCell::new(PicoRouterCache::new(DEFAULT_CAPACITY).unwrap())); + static PLAN_CACHE: Rc<Mutex<PicoRouterCache>> = Rc::new( + Mutex::new(PicoRouterCache::new(DEFAULT_CAPACITY).unwrap())); } pub const DEFAULT_BUCKET_COLUMN: &str = "bucket_id"; @@ -85,9 +87,9 @@ pub fn get_table_version(space_name: &str) -> Result<u64, SbroadError> { #[allow(clippy::module_name_repetitions)] pub struct RouterRuntime { - metadata: RefCell<RouterMetadata>, + metadata: Mutex<RouterMetadata>, bucket_count: u64, - ir_cache: Rc<RefCell<PicoRouterCache>>, + ir_cache: Rc<Mutex<PicoRouterCache>>, } impl RouterRuntime { @@ -99,7 +101,7 @@ impl RouterRuntime { let metadata = RouterMetadata::default(); let bucket_count = DEFAULT_BUCKET_COUNT; let runtime = PLAN_CACHE.with(|cache| RouterRuntime { - metadata: RefCell::new(metadata), + metadata: Mutex::new(metadata), bucket_count, ir_cache: cache.clone(), }); @@ -185,24 +187,16 @@ impl Cache<SmolStr, Plan> for PicoRouterCache { impl QueryCache for RouterRuntime { type Cache = PicoRouterCache; - fn cache(&self) -> &RefCell<Self::Cache> { - &self.ir_cache + fn cache(&self) -> &impl MutexLike<Self::Cache> { + &*self.ir_cache } fn cache_capacity(&self) -> Result<usize, SbroadError> { - Ok(self - .ir_cache - .try_borrow() - .map_err(|e| { - SbroadError::FailedTo(Action::Get, Some(Entity::Cache), format_smolstr!("{e:?}")) - })? - .capacity()) + Ok(self.cache().lock().capacity()) } fn clear_cache(&self) -> Result<(), SbroadError> { - *self.ir_cache.try_borrow_mut().map_err(|e| { - SbroadError::FailedTo(Action::Clear, Some(Entity::Cache), format_smolstr!("{e:?}")) - })? = Self::Cache::new(self.cache_capacity()?)?; + *self.ir_cache.lock() = Self::Cache::new(self.cache_capacity()?)?; Ok(()) } @@ -219,14 +213,8 @@ impl Router for RouterRuntime { type ParseTree = AbstractSyntaxTree; type MetadataProvider = RouterMetadata; - fn metadata(&self) -> Result<Ref<Self::MetadataProvider>, SbroadError> { - self.metadata.try_borrow().map_err(|e| { - SbroadError::FailedTo( - Action::Get, - Some(Entity::Metadata), - format_smolstr!("{e:?}"), - ) - }) + fn metadata(&self) -> &impl MutexLike<Self::MetadataProvider> { + &self.metadata } fn materialize_motion( @@ -256,14 +244,7 @@ impl Router for RouterRuntime { space: SmolStr, args: &'rec HashMap<SmolStr, Value>, ) -> Result<Vec<&'rec Value>, SbroadError> { - let metadata = self.metadata.try_borrow().map_err(|e| { - SbroadError::FailedTo( - Action::Borrow, - Some(Entity::Metadata), - format_smolstr!("{e:?}"), - ) - })?; - sharding_key_from_map(&*metadata, &space, args) + sharding_key_from_map(&*self.metadata().lock(), &space, args) } fn extract_sharding_key_from_tuple<'rec>( @@ -271,7 +252,7 @@ impl Router for RouterRuntime { space: SmolStr, args: &'rec [Value], ) -> Result<Vec<&'rec Value>, SbroadError> { - sharding_key_from_tuple(&*self.metadata()?, &space, args) + sharding_key_from_tuple(&*self.metadata().lock(), &space, args) } } @@ -318,7 +299,7 @@ impl Vshard for RouterRuntime { vtable_max_rows: u64, ) -> Result<Box<dyn Any>, SbroadError> { exec_ir_on_all_buckets( - &*self.metadata()?, + &*self.metadata().lock(), required, optional, query_type, @@ -363,7 +344,7 @@ impl Vshard for &RouterRuntime { vtable_max_rows: u64, ) -> Result<Box<dyn Any>, SbroadError> { exec_ir_on_all_buckets( - &*self.metadata()?, + &*self.metadata().lock(), required, optional, query_type, -- GitLab