From c2770b9a4c4efbd845f3968ff4db722d3d9deb00 Mon Sep 17 00:00:00 2001
From: Denis Smirnov <sd@picodata.io>
Date: Fri, 20 Jan 2023 15:37:18 +0700
Subject: [PATCH] fix: cache invalidation under concurrent queries

The configuration cache is a part of the executor's runtime.
Previously, quech query was holding a immutable borrow of the
runtime, while cache invalidation tried to get a mutable borrow
(apply_config). As a result, the cache invalidation always failed
to mutably borrow the runtime.
Current commit fixes the issue by wrapping the configuration in
a reference cell. As a result, apply_config now immutably borrows
the runtime and don't fail on this step.
---
 sbroad-benches/src/engine.rs                  | 43 +++++++++------
 sbroad-cartridge/src/api.rs                   |  7 +++
 sbroad-cartridge/src/api/helper.rs            |  9 +++-
 .../src/api/invalidate_cached_schema.rs       | 22 +++++---
 sbroad-cartridge/src/cartridge/router.rs      | 52 ++++++++++++-------
 sbroad-cartridge/src/cartridge/storage.rs     | 40 +++++++++-----
 sbroad-core/src/executor.rs                   |  3 +-
 sbroad-core/src/executor/engine.rs            | 22 ++++++--
 sbroad-core/src/executor/engine/mock.rs       | 40 ++++++++------
 9 files changed, 163 insertions(+), 75 deletions(-)

diff --git a/sbroad-benches/src/engine.rs b/sbroad-benches/src/engine.rs
index 33cb20972a..9dd78b30b8 100644
--- a/sbroad-benches/src/engine.rs
+++ b/sbroad-benches/src/engine.rs
@@ -1,9 +1,9 @@
 use std::any::Any;
-use std::cell::RefCell;
+use std::cell::{Ref, RefCell};
 use std::collections::HashMap;
 
 use sbroad::backend::sql::tree::{OrderedSyntaxNodes, SyntaxPlan};
-use sbroad::errors::{Entity, SbroadError};
+use sbroad::errors::{Action, Entity, SbroadError};
 use sbroad::executor::bucket::Buckets;
 use sbroad::executor::engine::{
     normalize_name_from_sql, sharding_keys_from_map, sharding_keys_from_tuple, Configuration,
@@ -354,7 +354,7 @@ impl RouterConfigurationMock {
 
 #[allow(clippy::module_name_repetitions)]
 pub struct RouterRuntimeMock {
-    metadata: RouterConfigurationMock,
+    metadata: RefCell<RouterConfigurationMock>,
     virtual_tables: HashMap<usize, VirtualTable>,
     ir_cache: RefCell<LRUCache<String, Plan>>,
 }
@@ -371,16 +371,25 @@ impl std::fmt::Debug for RouterRuntimeMock {
 impl Configuration for RouterRuntimeMock {
     type Configuration = RouterConfigurationMock;
 
-    fn cached_config(&self) -> &Self::Configuration {
-        &self.metadata
+    fn cached_config(&self) -> Result<Ref<Self::Configuration>, SbroadError> {
+        self.metadata.try_borrow().map_err(|e| {
+            SbroadError::FailedTo(Action::Borrow, Some(Entity::Metadata), format!("{e:?}"))
+        })
     }
 
-    fn clear_config(&mut self) {
-        self.metadata.tables.clear();
+    fn clear_config(&self) -> Result<(), SbroadError> {
+        let mut metadata = self.metadata.try_borrow_mut().map_err(|e| {
+            SbroadError::FailedTo(Action::Borrow, Some(Entity::Metadata), format!("{e:?}"))
+        })?;
+        metadata.tables.clear();
+        Ok(())
     }
 
-    fn is_config_empty(&self) -> bool {
-        self.metadata.tables.is_empty()
+    fn is_config_empty(&self) -> Result<bool, SbroadError> {
+        let metadata = self.metadata.try_borrow().map_err(|e| {
+            SbroadError::FailedTo(Action::Borrow, Some(Entity::Metadata), format!("{e:?}"))
+        })?;
+        Ok(metadata.tables.is_empty())
     }
 
     fn get_config(&self) -> Result<Option<Self::Configuration>, SbroadError> {
@@ -388,8 +397,12 @@ impl Configuration for RouterRuntimeMock {
         Ok(Some(config))
     }
 
-    fn update_config(&mut self, _config: Self::Configuration) {
-        self.metadata = RouterConfigurationMock::new();
+    fn update_config(&self, _config: Self::Configuration) -> Result<(), SbroadError> {
+        let mut cached_metadata = self.metadata.try_borrow_mut().map_err(|e| {
+            SbroadError::FailedTo(Action::Borrow, Some(Entity::Metadata), format!("{e:?}"))
+        })?;
+        *cached_metadata = RouterConfigurationMock::new();
+        Ok(())
     }
 }
 
@@ -446,7 +459,7 @@ impl Coordinator for RouterRuntimeMock {
         space: String,
         args: &'rec HashMap<String, Value>,
     ) -> Result<Vec<&'rec Value>, SbroadError> {
-        sharding_keys_from_map(self.cached_config(), &space, args)
+        sharding_keys_from_map(&*self.cached_config()?, &space, args)
     }
 
     fn extract_sharding_keys_from_tuple<'engine, 'rec>(
@@ -454,11 +467,11 @@ impl Coordinator for RouterRuntimeMock {
         space: String,
         rec: &'rec [Value],
     ) -> Result<Vec<&'rec Value>, SbroadError> {
-        sharding_keys_from_tuple(self.cached_config(), &space, rec)
+        sharding_keys_from_tuple(&*self.cached_config()?, &space, rec)
     }
 
     fn determine_bucket_id(&self, s: &[&Value]) -> u64 {
-        bucket_id_by_tuple(s, self.metadata.bucket_count)
+        bucket_id_by_tuple(s, self.metadata.borrow().bucket_count)
     }
 }
 
@@ -475,7 +488,7 @@ impl RouterRuntimeMock {
     pub fn new() -> Self {
         let cache: LRUCache<String, Plan> = LRUCache::new(DEFAULT_CAPACITY, None).unwrap();
         RouterRuntimeMock {
-            metadata: RouterConfigurationMock::new(),
+            metadata: RefCell::new(RouterConfigurationMock::new()),
             virtual_tables: HashMap::new(),
             ir_cache: RefCell::new(cache),
         }
diff --git a/sbroad-cartridge/src/api.rs b/sbroad-cartridge/src/api.rs
index cbc1a7786a..bc4620af3b 100644
--- a/sbroad-cartridge/src/api.rs
+++ b/sbroad-cartridge/src/api.rs
@@ -2,8 +2,15 @@ use crate::cartridge::router::RouterRuntime;
 use crate::cartridge::storage::StorageRuntime;
 use std::cell::RefCell;
 
+#[derive(Default)]
+pub struct AsyncCommands {
+    pub invalidate_router_cache: bool,
+    pub invalidate_segment_cache: bool,
+}
+
 thread_local!(static COORDINATOR_ENGINE: RefCell<RouterRuntime> = RefCell::new(RouterRuntime::new().unwrap()));
 thread_local!(static SEGMENT_ENGINE: RefCell<StorageRuntime> = RefCell::new(StorageRuntime::new().unwrap()));
+thread_local!(static ASYNC_COMMANDS: RefCell<AsyncCommands> = RefCell::new(AsyncCommands::default()));
 
 pub mod calculate_bucket_id;
 pub mod exec_query;
diff --git a/sbroad-cartridge/src/api/helper.rs b/sbroad-cartridge/src/api/helper.rs
index 67c68f2437..9d5bbc41a3 100644
--- a/sbroad-cartridge/src/api/helper.rs
+++ b/sbroad-cartridge/src/api/helper.rs
@@ -42,7 +42,7 @@ where
     // a mutable reference to the engine.
     if let Some(config) = config {
         code = (*engine).with(|runtime| {
-            let mut runtime = match runtime.try_borrow_mut() {
+            let runtime = match runtime.try_borrow() {
                 Ok(runtime) => runtime,
                 Err(e) => {
                     return tarantool_error(&format!(
@@ -51,7 +51,12 @@ where
                     ));
                 }
             };
-            runtime.update_config(config);
+            if let Err(e) = runtime.update_config(config) {
+                return tarantool_error(&format!(
+                    "Failed to update the configuration in the runtime during configuration loading: {}",
+                    e
+                ));
+            }
             0
         });
     }
diff --git a/sbroad-cartridge/src/api/invalidate_cached_schema.rs b/sbroad-cartridge/src/api/invalidate_cached_schema.rs
index 107c039d4d..3d1049ae67 100644
--- a/sbroad-cartridge/src/api/invalidate_cached_schema.rs
+++ b/sbroad-cartridge/src/api/invalidate_cached_schema.rs
@@ -9,9 +9,14 @@ use sbroad::log::tarantool_error;
 /// This function should be invoked in the Lua cartridge application with `apply_config()`.
 #[no_mangle]
 pub extern "C" fn invalidate_coordinator_cache(ctx: FunctionCtx, _: FunctionArgs) -> c_int {
-    COORDINATOR_ENGINE.with(|runtime| match runtime.try_borrow_mut() {
-        Ok(mut runtime) => {
-            runtime.clear_config();
+    COORDINATOR_ENGINE.with(|runtime| match runtime.try_borrow() {
+        Ok(runtime) => {
+            if let Err(e) = runtime.clear_config() {
+                return tarantool_error(&format!(
+                    "Failed to clear the configuration in the coordinator runtime during cache invalidation: {}",
+                    e
+                ));
+            }
             if let Err(e) = runtime.clear_ir_cache() {
                 return tarantool_error(&format!(
                     "Failed to clear the IR cache on router: {:?}",
@@ -32,9 +37,14 @@ pub extern "C" fn invalidate_coordinator_cache(ctx: FunctionCtx, _: FunctionArgs
 /// This function should be invoked in the Lua cartridge application with `apply_config()`.
 #[no_mangle]
 pub extern "C" fn invalidate_segment_cache(ctx: FunctionCtx, _: FunctionArgs) -> c_int {
-    SEGMENT_ENGINE.with(|runtime| match runtime.try_borrow_mut() {
-        Ok(mut runtime) => {
-            runtime.clear_config();
+    SEGMENT_ENGINE.with(|runtime| match runtime.try_borrow() {
+        Ok(runtime) => {
+            if let Err(e) = runtime.clear_config() {
+                return tarantool_error(&format!(
+                    "Failed to clear the configuration on segment during cache invalidation: {:?}",
+                    e
+                ));
+            }
             ctx.return_mp(&true).unwrap();
             0
         }
diff --git a/sbroad-cartridge/src/cartridge/router.rs b/sbroad-cartridge/src/cartridge/router.rs
index cb21f412d7..6c86dfb762 100644
--- a/sbroad-cartridge/src/cartridge/router.rs
+++ b/sbroad-cartridge/src/cartridge/router.rs
@@ -3,7 +3,7 @@
 use rand::prelude::*;
 
 use std::any::Any;
-use std::cell::RefCell;
+use std::cell::{Ref, RefCell};
 use std::collections::{HashMap, HashSet};
 use std::convert::TryInto;
 use std::rc::Rc;
@@ -44,7 +44,7 @@ type GroupedBuckets = HashMap<String, Vec<u64>>;
 /// The runtime (cluster configuration, buckets, IR cache) of the dispatcher node.
 #[allow(clippy::module_name_repetitions)]
 pub struct RouterRuntime {
-    metadata: RouterConfiguration,
+    metadata: RefCell<RouterConfiguration>,
     bucket_count: usize,
     ir_cache: RefCell<LRUCache<String, Plan>>,
 }
@@ -52,21 +52,30 @@ pub struct RouterRuntime {
 impl Configuration for RouterRuntime {
     type Configuration = RouterConfiguration;
 
-    fn cached_config(&self) -> &Self::Configuration {
-        &self.metadata
+    fn cached_config(&self) -> Result<Ref<Self::Configuration>, SbroadError> {
+        self.metadata.try_borrow().map_err(|e| {
+            SbroadError::FailedTo(Action::Borrow, Some(Entity::Metadata), format!("{e}"))
+        })
     }
 
-    fn clear_config(&mut self) {
-        self.metadata = Self::Configuration::new();
+    fn clear_config(&self) -> Result<(), SbroadError> {
+        let mut metadata = self.metadata.try_borrow_mut().map_err(|e| {
+            SbroadError::FailedTo(Action::Borrow, Some(Entity::Metadata), format!("{e}"))
+        })?;
+        *metadata = Self::Configuration::new();
+        Ok(())
     }
 
-    fn is_config_empty(&self) -> bool {
-        self.metadata.is_empty()
+    fn is_config_empty(&self) -> Result<bool, SbroadError> {
+        let metadata = self.metadata.try_borrow().map_err(|e| {
+            SbroadError::FailedTo(Action::Borrow, Some(Entity::Metadata), format!("{e}"))
+        })?;
+        Ok(metadata.is_empty())
     }
 
     #[allow(clippy::too_many_lines)]
     fn get_config(&self) -> Result<Option<Self::Configuration>, SbroadError> {
-        if self.is_config_empty() {
+        if self.is_config_empty()? {
             let lua = tarantool::lua_state();
 
             let get_schema: LuaFunction<_> = lua.eval("return get_schema;").unwrap();
@@ -155,8 +164,12 @@ impl Configuration for RouterRuntime {
         Ok(None)
     }
 
-    fn update_config(&mut self, metadata: Self::Configuration) {
-        self.metadata = metadata;
+    fn update_config(&self, metadata: Self::Configuration) -> Result<(), SbroadError> {
+        let mut cached_metadata = self.metadata.try_borrow_mut().map_err(|e| {
+            SbroadError::FailedTo(Action::Borrow, Some(Entity::Metadata), format!("{e}"))
+        })?;
+        *cached_metadata = metadata;
+        Ok(())
     }
 }
 
@@ -329,7 +342,10 @@ impl Coordinator for RouterRuntime {
         space: String,
         map: &'rec HashMap<String, Value>,
     ) -> Result<Vec<&'rec Value>, SbroadError> {
-        sharding_keys_from_map(&self.metadata, &space, map)
+        let metadata = self.metadata.try_borrow().map_err(|e| {
+            SbroadError::FailedTo(Action::Borrow, Some(Entity::Metadata), format!("{e:?}"))
+        })?;
+        sharding_keys_from_map(&*metadata, &space, map)
     }
 
     fn extract_sharding_keys_from_tuple<'engine, 'rec>(
@@ -337,7 +353,7 @@ impl Coordinator for RouterRuntime {
         space: String,
         rec: &'rec [Value],
     ) -> Result<Vec<&'rec Value>, SbroadError> {
-        sharding_keys_from_tuple(self.cached_config(), &space, rec)
+        sharding_keys_from_tuple(&*self.cached_config()?, &space, rec)
     }
 
     /// Calculate bucket for a key.
@@ -354,7 +370,7 @@ impl RouterRuntime {
     pub fn new() -> Result<Self, SbroadError> {
         let cache: LRUCache<String, Plan> = LRUCache::new(DEFAULT_CAPACITY, None)?;
         let mut result = RouterRuntime {
-            metadata: RouterConfiguration::new(),
+            metadata: RefCell::new(RouterConfiguration::new()),
             bucket_count: 0,
             ir_cache: RefCell::new(cache),
         };
@@ -420,7 +436,7 @@ impl RouterRuntime {
             .get("dql_on_some")
             .ok_or_else(|| SbroadError::LuaError("Lua function `dql_on_some` not found".into()))?;
 
-        let waiting_timeout = &self.cached_config().get_exec_waiting_timeout();
+        let waiting_timeout = &self.cached_config()?.get_exec_waiting_timeout();
         match exec_sql.call_with_args::<Tuple, _>((rs_ir, is_readonly, waiting_timeout)) {
             Ok(v) => {
                 debug!(Option::from("dql_on_some"), &format!("Result: {:?}", &v));
@@ -447,7 +463,7 @@ impl RouterRuntime {
             .get("dml_on_some")
             .ok_or_else(|| SbroadError::LuaError("Lua function `dml_on_some` not found".into()))?;
 
-        let waiting_timeout = &self.cached_config().get_exec_waiting_timeout();
+        let waiting_timeout = &self.cached_config()?.get_exec_waiting_timeout();
         match exec_sql.call_with_args::<Tuple, _>((rs_ir, is_readonly, waiting_timeout)) {
             Ok(v) => Ok(Box::new(v)),
             Err(e) => {
@@ -484,7 +500,7 @@ impl RouterRuntime {
             .get("dql_on_all")
             .ok_or_else(|| SbroadError::LuaError("Lua function `dql_on_all` not found".into()))?;
 
-        let waiting_timeout = &self.cached_config().get_exec_waiting_timeout();
+        let waiting_timeout = &self.cached_config()?.get_exec_waiting_timeout();
         match exec_sql.call_with_args::<Tuple, _>((
             required,
             optional,
@@ -517,7 +533,7 @@ impl RouterRuntime {
             .get("dml_on_all")
             .ok_or_else(|| SbroadError::LuaError("Lua function `dml_on_all` not found".into()))?;
 
-        let waiting_timeout = &self.cached_config().get_exec_waiting_timeout();
+        let waiting_timeout = &self.cached_config()?.get_exec_waiting_timeout();
         match exec_sql.call_with_args::<Tuple, _>((
             required,
             optional,
diff --git a/sbroad-cartridge/src/cartridge/storage.rs b/sbroad-cartridge/src/cartridge/storage.rs
index 758a692586..bad6ce331e 100644
--- a/sbroad-cartridge/src/cartridge/storage.rs
+++ b/sbroad-cartridge/src/cartridge/storage.rs
@@ -11,7 +11,7 @@ use sbroad::otm::child_span;
 use sbroad::{debug, error, warn};
 use sbroad_proc::otm_child_span;
 use std::any::Any;
-use std::cell::RefCell;
+use std::cell::{Ref, RefCell};
 use std::fmt::Display;
 use tarantool::tlua::LuaFunction;
 use tarantool::tuple::Tuple;
@@ -59,27 +59,36 @@ impl std::fmt::Debug for PreparedStmt {
 
 #[allow(clippy::module_name_repetitions)]
 pub struct StorageRuntime {
-    metadata: StorageConfiguration,
+    metadata: RefCell<StorageConfiguration>,
     cache: RefCell<LRUCache<String, PreparedStmt>>,
 }
 
 impl Configuration for StorageRuntime {
     type Configuration = StorageConfiguration;
 
-    fn cached_config(&self) -> &Self::Configuration {
-        &self.metadata
+    fn cached_config(&self) -> Result<Ref<Self::Configuration>, SbroadError> {
+        self.metadata.try_borrow().map_err(|e| {
+            SbroadError::FailedTo(Action::Borrow, Some(Entity::Metadata), format!("{e:?}"))
+        })
     }
 
-    fn clear_config(&mut self) {
-        self.metadata = StorageConfiguration::default();
+    fn clear_config(&self) -> Result<(), SbroadError> {
+        let mut metadata = self.metadata.try_borrow_mut().map_err(|e| {
+            SbroadError::FailedTo(Action::Borrow, Some(Entity::Metadata), format!("{e:?}"))
+        })?;
+        *metadata = StorageConfiguration::default();
+        Ok(())
     }
 
-    fn is_config_empty(&self) -> bool {
-        self.metadata.is_empty()
+    fn is_config_empty(&self) -> Result<bool, SbroadError> {
+        let metadata = self.metadata.try_borrow().map_err(|e| {
+            SbroadError::FailedTo(Action::Borrow, Some(Entity::Metadata), format!("{e:?}"))
+        })?;
+        Ok(metadata.is_empty())
     }
 
     fn get_config(&self) -> Result<Option<Self::Configuration>, SbroadError> {
-        if self.is_config_empty() {
+        if self.is_config_empty()? {
             let lua = tarantool::lua_state();
 
             let storage_cache_capacity: LuaFunction<_> =
@@ -144,9 +153,14 @@ impl Configuration for StorageRuntime {
         Ok(None)
     }
 
-    fn update_config(&mut self, metadata: Self::Configuration) {
-        self.metadata = metadata;
-        update_box_param("sql_cache_size", self.metadata.storage_size_bytes);
+    fn update_config(&self, metadata: Self::Configuration) -> Result<(), SbroadError> {
+        let mut cached_metadata = self.metadata.try_borrow_mut().map_err(|e| {
+            SbroadError::FailedTo(Action::Borrow, Some(Entity::Metadata), format!("{e:?}"))
+        })?;
+        let storage_size_bytes = metadata.storage_size_bytes;
+        *cached_metadata = metadata;
+        update_box_param("sql_cache_size", storage_size_bytes);
+        Ok(())
     }
 }
 
@@ -159,7 +173,7 @@ impl StorageRuntime {
         let cache: LRUCache<String, PreparedStmt> =
             LRUCache::new(DEFAULT_CAPACITY, Some(Box::new(unprepare)))?;
         let result = StorageRuntime {
-            metadata: StorageConfiguration::new(),
+            metadata: RefCell::new(StorageConfiguration::new()),
             cache: RefCell::new(cache),
         };
 
diff --git a/sbroad-core/src/executor.rs b/sbroad-core/src/executor.rs
index 9668a19fb4..a3574af13c 100644
--- a/sbroad-core/src/executor.rs
+++ b/sbroad-core/src/executor.rs
@@ -111,7 +111,8 @@ where
         }
         if plan.is_empty() {
             let ast = C::ParseTree::new(sql)?;
-            plan = ast.resolve_metadata(coordinator.cached_config())?;
+            let metadata = &*coordinator.cached_config()?;
+            plan = ast.resolve_metadata(metadata)?;
             cache.put(key, plan.clone())?;
         }
         plan.bind_params(params)?;
diff --git a/sbroad-core/src/executor/engine.rs b/sbroad-core/src/executor/engine.rs
index 46412227cf..82887e6a19 100644
--- a/sbroad-core/src/executor/engine.rs
+++ b/sbroad-core/src/executor/engine.rs
@@ -3,7 +3,7 @@
 //! Traits that define an execution engine interface.
 
 use std::any::Any;
-use std::cell::RefCell;
+use std::cell::{Ref, RefCell};
 use std::cmp::Ordering;
 use std::collections::HashMap;
 
@@ -61,13 +61,22 @@ pub trait Configuration: Sized {
     type Configuration;
 
     /// Return a cached cluster configuration from the Rust memory.
-    fn cached_config(&self) -> &Self::Configuration;
+    ///
+    /// # Errors
+    /// - Failed to get a configuration from the coordinator runtime.
+    fn cached_config(&self) -> Result<Ref<Self::Configuration>, SbroadError>;
 
     /// Clear the cached cluster configuration in the Rust memory.
-    fn clear_config(&mut self);
+    ///
+    /// # Errors
+    /// - Failed to clear the cached configuration.
+    fn clear_config(&self) -> Result<(), SbroadError>;
 
     /// Check if the cached cluster configuration is empty.
-    fn is_config_empty(&self) -> bool;
+    ///
+    /// # Errors
+    /// - Failed to get the cached configuration.
+    fn is_config_empty(&self) -> Result<bool, SbroadError>;
 
     /// Retrieve cluster configuration from the Lua memory.
     ///
@@ -79,7 +88,10 @@ pub trait Configuration: Sized {
     fn get_config(&self) -> Result<Option<Self::Configuration>, SbroadError>;
 
     /// Update cached cluster configuration.
-    fn update_config(&mut self, metadata: Self::Configuration);
+    ///
+    /// # Errors
+    /// - Failed to update the configuration.
+    fn update_config(&self, metadata: Self::Configuration) -> Result<(), SbroadError>;
 }
 
 /// A coordinator trait.
diff --git a/sbroad-core/src/executor/engine/mock.rs b/sbroad-core/src/executor/engine/mock.rs
index 1bb3b47a13..0c69d20ed4 100644
--- a/sbroad-core/src/executor/engine/mock.rs
+++ b/sbroad-core/src/executor/engine/mock.rs
@@ -1,10 +1,10 @@
 use std::any::Any;
-use std::cell::RefCell;
+use std::cell::{Ref, RefCell};
 use std::collections::{HashMap, HashSet};
 
 use crate::backend::sql::tree::{OrderedSyntaxNodes, SyntaxPlan};
 use crate::collection;
-use crate::errors::{Entity, SbroadError};
+use crate::errors::{Action, Entity, SbroadError};
 use crate::executor::bucket::Buckets;
 use crate::executor::engine::{
     normalize_name_from_sql, sharding_keys_from_map, sharding_keys_from_tuple, Configuration,
@@ -248,7 +248,7 @@ impl RouterConfigurationMock {
 
 #[allow(clippy::module_name_repetitions)]
 pub struct RouterRuntimeMock {
-    metadata: RouterConfigurationMock,
+    metadata: RefCell<RouterConfigurationMock>,
     virtual_tables: RefCell<HashMap<usize, VirtualTable>>,
     ir_cache: RefCell<LRUCache<String, Plan>>,
 }
@@ -288,16 +288,25 @@ impl ProducerResult {
 impl Configuration for RouterRuntimeMock {
     type Configuration = RouterConfigurationMock;
 
-    fn cached_config(&self) -> &Self::Configuration {
-        &self.metadata
+    fn cached_config(&self) -> Result<Ref<Self::Configuration>, SbroadError> {
+        self.metadata.try_borrow().map_err(|e| {
+            SbroadError::FailedTo(Action::Borrow, Some(Entity::Metadata), format!("{e:?}"))
+        })
     }
 
-    fn clear_config(&mut self) {
-        self.metadata.tables.clear();
+    fn clear_config(&self) -> Result<(), SbroadError> {
+        let mut metadata = self.metadata.try_borrow_mut().map_err(|e| {
+            SbroadError::FailedTo(Action::Borrow, Some(Entity::Metadata), format!("{e:?}"))
+        })?;
+        metadata.tables.clear();
+        Ok(())
     }
 
-    fn is_config_empty(&self) -> bool {
-        self.metadata.tables.is_empty()
+    fn is_config_empty(&self) -> Result<bool, SbroadError> {
+        let metadata = self.metadata.try_borrow().map_err(|e| {
+            SbroadError::FailedTo(Action::Borrow, Some(Entity::Metadata), format!("{e:?}"))
+        })?;
+        Ok(metadata.tables.is_empty())
     }
 
     fn get_config(&self) -> Result<Option<Self::Configuration>, SbroadError> {
@@ -305,8 +314,9 @@ impl Configuration for RouterRuntimeMock {
         Ok(Some(config))
     }
 
-    fn update_config(&mut self, _config: Self::Configuration) {
-        self.metadata = RouterConfigurationMock::new();
+    fn update_config(&self, _config: Self::Configuration) -> Result<(), SbroadError> {
+        *self.metadata.borrow_mut() = RouterConfigurationMock::new();
+        Ok(())
     }
 }
 
@@ -381,7 +391,7 @@ impl Coordinator for RouterRuntimeMock {
         space: String,
         args: &'rec HashMap<String, Value>,
     ) -> Result<Vec<&'rec Value>, SbroadError> {
-        sharding_keys_from_map(&self.metadata, &space, args)
+        sharding_keys_from_map(&*self.metadata.borrow(), &space, args)
     }
 
     fn extract_sharding_keys_from_tuple<'engine, 'rec>(
@@ -389,11 +399,11 @@ impl Coordinator for RouterRuntimeMock {
         space: String,
         rec: &'rec [Value],
     ) -> Result<Vec<&'rec Value>, SbroadError> {
-        sharding_keys_from_tuple(self.cached_config(), &space, rec)
+        sharding_keys_from_tuple(&*self.metadata.borrow(), &space, rec)
     }
 
     fn determine_bucket_id(&self, s: &[&Value]) -> u64 {
-        bucket_id_by_tuple(s, self.metadata.bucket_count)
+        bucket_id_by_tuple(s, self.metadata.borrow().bucket_count)
     }
 }
 
@@ -410,7 +420,7 @@ impl RouterRuntimeMock {
     pub fn new() -> Self {
         let cache: LRUCache<String, Plan> = LRUCache::new(DEFAULT_CAPACITY, None).unwrap();
         RouterRuntimeMock {
-            metadata: RouterConfigurationMock::new(),
+            metadata: RefCell::new(RouterConfigurationMock::new()),
             virtual_tables: RefCell::new(HashMap::new()),
             ir_cache: RefCell::new(cache),
         }
-- 
GitLab