From 4aba51fc298b8f87b13045fd1783ba125f257db4 Mon Sep 17 00:00:00 2001 From: Fedor Telnov <f.telnov@picodata.io> Date: Tue, 26 Mar 2024 11:44:50 +0000 Subject: [PATCH] refactor: better interfaces, open cartridge API --- sbroad-cartridge/src/api.rs | 6 +++--- sbroad-cartridge/src/api/calculate_bucket_id.rs | 2 +- sbroad-cartridge/src/api/exec_query.rs | 4 ++-- .../src/api/invalidate_cached_schema.rs | 4 ++-- sbroad-cartridge/src/cartridge/storage.rs | 6 +++++- sbroad-cartridge/src/lib.rs | 4 ++-- sbroad-core/src/executor/hash.rs | 13 +++++++++++-- 7 files changed, 26 insertions(+), 13 deletions(-) diff --git a/sbroad-cartridge/src/api.rs b/sbroad-cartridge/src/api.rs index 27b3dc1ea6..496854d2fa 100644 --- a/sbroad-cartridge/src/api.rs +++ b/sbroad-cartridge/src/api.rs @@ -8,9 +8,9 @@ pub struct AsyncCommands { pub invalidate_segment_cache: bool, } -thread_local!(static COORDINATOR_ENGINE: RefCell<RouterRuntime> = RefCell::new(RouterRuntime::new().unwrap())); -thread_local!(static SEGMENT_ENGINE: RefCell<StorageRuntime> = RefCell::new(StorageRuntime::new().unwrap())); -thread_local!(static ASYNC_COMMANDS: RefCell<AsyncCommands> = RefCell::new(AsyncCommands::default())); +thread_local!(pub static COORDINATOR_ENGINE: RefCell<RouterRuntime> = RefCell::new(RouterRuntime::new().unwrap())); +thread_local!(pub static SEGMENT_ENGINE: RefCell<StorageRuntime> = RefCell::new(StorageRuntime::new().unwrap())); +thread_local!(pub static ASYNC_COMMANDS: RefCell<AsyncCommands> = RefCell::new(AsyncCommands::default())); pub mod calculate_bucket_id; pub mod exec_query; diff --git a/sbroad-cartridge/src/api/calculate_bucket_id.rs b/sbroad-cartridge/src/api/calculate_bucket_id.rs index f3200de2cd..b2fd0880ba 100644 --- a/sbroad-cartridge/src/api/calculate_bucket_id.rs +++ b/sbroad-cartridge/src/api/calculate_bucket_id.rs @@ -113,7 +113,7 @@ impl TryFrom<FunctionArgs> for Args { } #[no_mangle] -pub extern "C" fn calculate_bucket_id(ctx: FunctionCtx, args: FunctionArgs) -> c_int { +extern "C" fn calculate_bucket_id(ctx: FunctionCtx, args: FunctionArgs) -> c_int { let ret_code = load_config(&COORDINATOR_ENGINE); if ret_code != 0 { return ret_code; diff --git a/sbroad-cartridge/src/api/exec_query.rs b/sbroad-cartridge/src/api/exec_query.rs index c59f7c0462..e4be298538 100644 --- a/sbroad-cartridge/src/api/exec_query.rs +++ b/sbroad-cartridge/src/api/exec_query.rs @@ -13,7 +13,7 @@ use sbroad::{debug, error}; /// Dispatch parameterized SQL query from coordinator to the segments. #[no_mangle] -pub extern "C" fn dispatch_query(f_ctx: FunctionCtx, args: FunctionArgs) -> c_int { +extern "C" fn dispatch_query(f_ctx: FunctionCtx, args: FunctionArgs) -> c_int { let lua_params = match PatternWithParams::try_from(args) { Ok(params) => params, Err(e) => { @@ -78,7 +78,7 @@ pub extern "C" fn dispatch_query(f_ctx: FunctionCtx, args: FunctionArgs) -> c_in } #[no_mangle] -pub extern "C" fn execute(f_ctx: FunctionCtx, args: FunctionArgs) -> c_int { +extern "C" fn execute(f_ctx: FunctionCtx, args: FunctionArgs) -> c_int { debug!(Option::from("decode_msgpack"), &format!("args: {args:?}")); let tuple_buf: Vec<u8> = TupleBuffer::from(Tuple::from(args)).into(); let (raw_required, mut raw_optional) = match decode_msgpack(tuple_buf.as_slice()) { diff --git a/sbroad-cartridge/src/api/invalidate_cached_schema.rs b/sbroad-cartridge/src/api/invalidate_cached_schema.rs index 786c99f5ea..c2179f7194 100644 --- a/sbroad-cartridge/src/api/invalidate_cached_schema.rs +++ b/sbroad-cartridge/src/api/invalidate_cached_schema.rs @@ -10,7 +10,7 @@ use sbroad::{executor::engine::QueryCache, log::tarantool_error}; /// Flush cached configuration in the Rust memory of the coordinator runtime. /// This function should be invoked in the Lua cartridge application with `apply_config()`. #[no_mangle] -pub extern "C" fn invalidate_coordinator_cache(ctx: FunctionCtx, _: FunctionArgs) -> c_int { +extern "C" fn invalidate_coordinator_cache(ctx: FunctionCtx, _: FunctionArgs) -> c_int { COORDINATOR_ENGINE.with(|runtime| match runtime.try_borrow() { Ok(runtime) => { if let Err(e) = runtime.clear_config() { @@ -35,7 +35,7 @@ pub extern "C" fn invalidate_coordinator_cache(ctx: FunctionCtx, _: FunctionArgs /// Flush cached configuration in the Rust memory of the segment runtime. /// This function should be invoked in the Lua cartridge application with `apply_config()`. #[no_mangle] -pub extern "C" fn invalidate_segment_cache(ctx: FunctionCtx, _: FunctionArgs) -> c_int { +extern "C" fn invalidate_segment_cache(ctx: FunctionCtx, _: FunctionArgs) -> c_int { SEGMENT_ENGINE.with(|runtime| match runtime.try_borrow() { Ok(runtime) => { if let Err(e) = runtime.clear_config() { diff --git a/sbroad-cartridge/src/cartridge/storage.rs b/sbroad-cartridge/src/cartridge/storage.rs index 30221b3ae9..8a56428bff 100644 --- a/sbroad-cartridge/src/cartridge/storage.rs +++ b/sbroad-cartridge/src/cartridge/storage.rs @@ -226,7 +226,11 @@ impl StorageRuntime { Ok(result) } - #[allow(unused_variables)] + /// Executes provided plan. + /// + /// # Errors + /// + /// Will return `Err` if underlying DML/DQL implementation returns `Err`. pub fn execute_plan( &self, required: &mut RequiredData, diff --git a/sbroad-cartridge/src/lib.rs b/sbroad-cartridge/src/lib.rs index ef0a08a72f..092cd0afdc 100644 --- a/sbroad-cartridge/src/lib.rs +++ b/sbroad-cartridge/src/lib.rs @@ -1,2 +1,2 @@ -mod api; -mod cartridge; +pub mod api; +pub mod cartridge; diff --git a/sbroad-core/src/executor/hash.rs b/sbroad-core/src/executor/hash.rs index 3625b8d820..4f257b8337 100644 --- a/sbroad-core/src/executor/hash.rs +++ b/sbroad-core/src/executor/hash.rs @@ -4,6 +4,12 @@ pub trait ToHashString { fn to_hash_string(&self) -> String; } +impl<T: ToHashString> ToHashString for &T { + fn to_hash_string(&self) -> String { + T::to_hash_string(self) + } +} + #[must_use] /// A simple function to calculate the bucket id from a string slice. /// `(MurMur3 hash at str) % bucket_count + 1` @@ -14,11 +20,14 @@ pub fn str_to_bucket_id(s: &str, bucket_count: u64) -> u64 { } #[must_use] -pub fn bucket_id_by_tuple<T>(tuple: &[&T], bucket_count: u64) -> u64 +pub fn bucket_id_by_tuple<T>(sharding_val: impl IntoIterator<Item = T>, bucket_count: u64) -> u64 where T: ToHashString, { - let hash_str = tuple.iter().map(|v| v.to_hash_string()).collect::<String>(); + let hash_str = sharding_val + .into_iter() + .map(|v| v.to_hash_string()) + .collect::<String>(); str_to_bucket_id(&hash_str, bucket_count) } -- GitLab