From 08e3f622ae9a4aa702ed9146272c0bfd83fc9e07 Mon Sep 17 00:00:00 2001 From: Denis Smirnov <sd@picodata.io> Date: Wed, 5 Apr 2023 11:22:33 +0700 Subject: [PATCH] refactoring: review fixes --- .../src/api/exec_query/protocol.rs | 318 ------------------ sbroad-cartridge/src/api/helper.rs | 4 +- .../src/api/invalidate_cached_schema.rs | 2 +- sbroad-cartridge/src/cartridge.rs | 4 +- sbroad-cartridge/src/cartridge/router.rs | 6 +- sbroad-cartridge/src/cartridge/storage.rs | 4 +- sbroad-core/src/executor/engine.rs | 8 +- sbroad-core/src/executor/engine/helpers.rs | 20 +- sbroad-core/src/executor/protocol.rs | 5 +- 9 files changed, 34 insertions(+), 337 deletions(-) delete mode 100644 sbroad-cartridge/src/api/exec_query/protocol.rs diff --git a/sbroad-cartridge/src/api/exec_query/protocol.rs b/sbroad-cartridge/src/api/exec_query/protocol.rs deleted file mode 100644 index c63ee88168..0000000000 --- a/sbroad-cartridge/src/api/exec_query/protocol.rs +++ /dev/null @@ -1,318 +0,0 @@ -use opentelemetry::trace::TraceContextExt; -use opentelemetry::Context; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use tarantool::tlua::{self, AsLua, PushGuard, PushInto, PushOneInto, Void}; - -use sbroad::backend::sql::tree::OrderedSyntaxNodes; -use sbroad::debug; -use sbroad::errors::{Action, Entity, SbroadError}; -use sbroad::executor::ir::{ExecutionPlan, QueryType}; -use sbroad::ir::value::Value; -use sbroad::otm::{ - current_id, extract_context, force_trace, get_tracer, inject_context, QueryTracer, -}; - -#[derive(Debug, Deserialize, Serialize, PartialEq)] -pub struct Binary(Vec<u8>); - -impl From<Vec<u8>> for Binary { - fn from(value: Vec<u8>) -> Self { - Binary(value) - } -} - -impl<L> PushInto<L> for Binary -where - L: AsLua, -{ - type Err = Void; - - fn push_into_lua(self, lua: L) -> Result<PushGuard<L>, (Void, L)> { - let encoded = unsafe { String::from_utf8_unchecked(self.0) }; - encoded.push_into_lua(lua) - } -} - -impl<L> PushOneInto<L> for Binary where L: AsLua {} - -#[derive(PushInto, Debug, Deserialize, Serialize, PartialEq)] -pub struct Message { - required: Binary, - optional: Binary, -} - -impl From<(Binary, Binary)> for Message { - fn from(value: (Binary, Binary)) -> Self { - Message { - required: value.0, - optional: value.1, - } - } -} - -#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] -pub struct RequiredData { - pub(crate) plan_id: String, - pub(crate) parameters: Vec<Value>, - pub(crate) query_type: QueryType, - pub(crate) can_be_cached: bool, - context: ContextCarrier, - force_trace: bool, - trace_id: Option<String>, -} - -impl Default for RequiredData { - fn default() -> Self { - RequiredData { - plan_id: String::new(), - parameters: vec![], - query_type: QueryType::DQL, - can_be_cached: true, - context: ContextCarrier::empty(), - force_trace: false, - trace_id: None, - } - } -} - -impl TryFrom<RequiredData> for Vec<u8> { - type Error = SbroadError; - - fn try_from(value: RequiredData) -> Result<Self, Self::Error> { - bincode::serialize(&value).map_err(|e| { - SbroadError::FailedTo( - Action::Serialize, - Some(Entity::RequiredData), - format!("to binary: {e:?}"), - ) - }) - } -} - -impl TryFrom<&[u8]> for RequiredData { - type Error = SbroadError; - - fn try_from(value: &[u8]) -> Result<Self, Self::Error> { - bincode::deserialize(value).map_err(|e| { - SbroadError::FailedTo( - Action::Deserialize, - Some(Entity::RequiredData), - format!("{e:?}"), - ) - }) - } -} - -impl RequiredData { - pub fn new( - plan_id: String, - parameters: Vec<Value>, - query_type: QueryType, - can_be_cached: bool, - ) -> Self { - let mut carrier = HashMap::new(); - inject_context(&mut carrier); - let force_trace = force_trace(); - if carrier.is_empty() { - RequiredData { - plan_id, - parameters, - query_type, - can_be_cached, - context: ContextCarrier::empty(), - force_trace, - trace_id: None, - } - } else { - RequiredData { - plan_id, - parameters, - query_type, - can_be_cached, - context: ContextCarrier::new(carrier), - force_trace, - trace_id: Some(current_id()), - } - } - } - - pub fn tracer(&self) -> QueryTracer { - get_tracer( - self.force_trace, - self.trace_id.as_ref(), - Some(&self.context.payload), - ) - } - - pub fn id(&self) -> &str { - match &self.trace_id { - Some(trace_id) => trace_id, - None => &self.plan_id, - } - } - - pub fn extract_context(&mut self) -> Context { - (&mut self.context).into() - } -} - -pub struct EncodedRequiredData(Vec<u8>); - -impl From<Vec<u8>> for EncodedRequiredData { - fn from(value: Vec<u8>) -> Self { - EncodedRequiredData(value) - } -} - -impl From<EncodedRequiredData> for Vec<u8> { - fn from(value: EncodedRequiredData) -> Self { - value.0 - } -} - -impl TryFrom<RequiredData> for EncodedRequiredData { - type Error = SbroadError; - - fn try_from(value: RequiredData) -> Result<Self, Self::Error> { - let bytes: Vec<u8> = value.try_into()?; - Ok(EncodedRequiredData(bytes)) - } -} - -impl TryFrom<EncodedRequiredData> for RequiredData { - type Error = SbroadError; - - fn try_from(value: EncodedRequiredData) -> Result<Self, Self::Error> { - let ir: RequiredData = value.0.as_slice().try_into()?; - Ok(ir) - } -} - -#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] -pub struct OptionalData { - pub(crate) exec_plan: ExecutionPlan, - pub(crate) ordered: OrderedSyntaxNodes, -} - -impl TryFrom<OptionalData> for Vec<u8> { - type Error = SbroadError; - - fn try_from(value: OptionalData) -> Result<Self, Self::Error> { - bincode::serialize(&value).map_err(|e| { - SbroadError::FailedTo( - Action::Serialize, - Some(Entity::RequiredData), - format!("to binary: {e:?}"), - ) - }) - } -} - -impl TryFrom<&[u8]> for OptionalData { - type Error = SbroadError; - - fn try_from(value: &[u8]) -> Result<Self, Self::Error> { - bincode::deserialize(value).map_err(|e| { - SbroadError::FailedTo( - Action::Deserialize, - Some(Entity::RequiredData), - format!("{e:?}"), - ) - }) - } -} - -impl OptionalData { - pub fn new(exec_plan: ExecutionPlan, ordered: OrderedSyntaxNodes) -> Self { - OptionalData { exec_plan, ordered } - } - - pub fn to_bytes(&self) -> Result<Vec<u8>, SbroadError> { - bincode::serialize(self).map_err(|e| { - SbroadError::FailedTo( - Action::Serialize, - Some(Entity::RequiredData), - format!("to binary: {e:?}"), - ) - }) - } - - pub fn try_from_bytes(bytes: &[u8]) -> Result<Self, SbroadError> { - bincode::deserialize(bytes).map_err(|e| { - SbroadError::FailedTo( - Action::Deserialize, - Some(Entity::RequiredData), - format!("{e:?}"), - ) - }) - } -} - -pub struct EncodedOptionalData(Vec<u8>); - -impl From<Vec<u8>> for EncodedOptionalData { - fn from(value: Vec<u8>) -> Self { - EncodedOptionalData(value) - } -} - -impl From<EncodedOptionalData> for Vec<u8> { - fn from(value: EncodedOptionalData) -> Self { - value.0 - } -} - -impl TryFrom<OptionalData> for EncodedOptionalData { - type Error = SbroadError; - - fn try_from(value: OptionalData) -> Result<Self, Self::Error> { - let bytes: Vec<u8> = value.try_into()?; - Ok(EncodedOptionalData(bytes)) - } -} - -impl TryFrom<EncodedOptionalData> for OptionalData { - type Error = SbroadError; - - fn try_from(value: EncodedOptionalData) -> Result<Self, Self::Error> { - let ir: OptionalData = value.0.as_slice().try_into()?; - Ok(ir) - } -} - -#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] -pub struct ContextCarrier { - payload: HashMap<String, String>, -} - -impl ContextCarrier { - pub fn new(payload: HashMap<String, String>) -> Self { - ContextCarrier { payload } - } - - pub fn empty() -> Self { - ContextCarrier { - payload: HashMap::new(), - } - } -} - -impl From<&mut ContextCarrier> for Context { - fn from(carrier: &mut ContextCarrier) -> Self { - if carrier.payload.is_empty() { - Context::new() - } else { - debug!( - Option::from("dispatched IR"), - &format!("Serialized OTM span context: {:?}", carrier.payload), - ); - let ctx = extract_context(&mut carrier.payload); - debug!( - Option::from("dispatched IR"), - &format!("Deserialized OTM span context: {:?}", ctx.span()), - ); - ctx - } - } -} diff --git a/sbroad-cartridge/src/api/helper.rs b/sbroad-cartridge/src/api/helper.rs index e3b7bc13aa..8eaeb89625 100644 --- a/sbroad-cartridge/src/api/helper.rs +++ b/sbroad-cartridge/src/api/helper.rs @@ -4,11 +4,11 @@ use std::cell::RefCell; use std::os::raw::c_int; use std::thread::LocalKey; -use crate::cartridge::Configuration; +use crate::cartridge::ConfigurationProvider; pub fn load_config<Runtime>(engine: &'static LocalKey<RefCell<Runtime>>) -> c_int where - Runtime: Configuration, + Runtime: ConfigurationProvider, { // Tarantool can yield in the middle of a current closure, // so we can hold only an immutable reference to the engine. diff --git a/sbroad-cartridge/src/api/invalidate_cached_schema.rs b/sbroad-cartridge/src/api/invalidate_cached_schema.rs index e6432d7530..2a0df21cfc 100644 --- a/sbroad-cartridge/src/api/invalidate_cached_schema.rs +++ b/sbroad-cartridge/src/api/invalidate_cached_schema.rs @@ -3,7 +3,7 @@ use tarantool::tuple::{FunctionArgs, FunctionCtx}; use crate::{ api::{COORDINATOR_ENGINE, SEGMENT_ENGINE}, - cartridge::Configuration, + cartridge::ConfigurationProvider, }; use sbroad::{executor::engine::QueryCache, log::tarantool_error}; diff --git a/sbroad-cartridge/src/cartridge.rs b/sbroad-cartridge/src/cartridge.rs index 75e560158f..5525071929 100644 --- a/sbroad-cartridge/src/cartridge.rs +++ b/sbroad-cartridge/src/cartridge.rs @@ -36,8 +36,8 @@ pub fn update_tracing(host: &str, port: u16) -> Result<(), SbroadError> { Ok(()) } -/// Cartridge cluster configuration. -pub trait Configuration: Sized { +/// Cartridge cluster configuration provider. +pub trait ConfigurationProvider: Sized { type Configuration; /// Return a cached cluster configuration from the Rust memory. diff --git a/sbroad-cartridge/src/cartridge/router.rs b/sbroad-cartridge/src/cartridge/router.rs index a1ff3b35a1..1d502f81c7 100644 --- a/sbroad-cartridge/src/cartridge/router.rs +++ b/sbroad-cartridge/src/cartridge/router.rs @@ -39,7 +39,7 @@ use sbroad::ir::Plan; use sbroad::otm::child_span; use sbroad_proc::otm_child_span; -use super::Configuration; +use super::ConfigurationProvider; /// The runtime (cluster configuration, buckets, IR cache) of the dispatcher node. #[allow(clippy::module_name_repetitions)] @@ -49,7 +49,7 @@ pub struct RouterRuntime { ir_cache: RefCell<LRUCache<String, Plan>>, } -impl Configuration for RouterRuntime { +impl ConfigurationProvider for RouterRuntime { type Configuration = RouterConfiguration; fn cached_config(&self) -> Result<Ref<Self::Configuration>, SbroadError> { @@ -226,7 +226,7 @@ impl Router for RouterRuntime { } fn explain_format(&self, explain: String) -> Result<Box<dyn Any>, SbroadError> { - explain_format(explain) + explain_format(&explain) } /// Transform sub query results into a virtual table. diff --git a/sbroad-cartridge/src/cartridge/storage.rs b/sbroad-cartridge/src/cartridge/storage.rs index f0667360e4..0f722c377b 100644 --- a/sbroad-cartridge/src/cartridge/storage.rs +++ b/sbroad-cartridge/src/cartridge/storage.rs @@ -17,7 +17,7 @@ use std::fmt::Display; use tarantool::tlua::LuaFunction; use tarantool::tuple::Tuple; -use super::Configuration; +use super::ConfigurationProvider; struct Statement { id: u32, @@ -91,7 +91,7 @@ impl QueryCache for StorageRuntime { } } -impl Configuration for StorageRuntime { +impl ConfigurationProvider for StorageRuntime { type Configuration = StorageConfiguration; fn cached_config(&self) -> Result<Ref<Self::Configuration>, SbroadError> { diff --git a/sbroad-core/src/executor/engine.rs b/sbroad-core/src/executor/engine.rs index 2bc282ad5f..85f339536c 100644 --- a/sbroad-core/src/executor/engine.rs +++ b/sbroad-core/src/executor/engine.rs @@ -18,7 +18,7 @@ use crate::ir::relation::Table; use crate::ir::value::Value; use super::ir::{ConnectionType, QueryType}; -use super::protocol::{Binary, Message}; +use super::protocol::Binary; pub mod helpers; #[cfg(test)] @@ -93,13 +93,15 @@ pub trait Router: QueryCache { /// Get the metadata provider (tables, functions, etc.). /// /// # Errors - /// - Internal runtime error. + /// - Internal error. Under normal conditions we should always return + /// metadata successfully. fn metadata(&self) -> Result<Ref<Self::MetadataProvider>, SbroadError>; /// Setup output format of query explain /// /// # Errors - /// - internal executor errors + /// - Internal error. Under normal conditions we should always return + /// formatted explain successfully. fn explain_format(&self, explain: String) -> Result<Box<dyn Any>, SbroadError>; /// Extract a list of the sharding keys from a map for the given space. diff --git a/sbroad-core/src/executor/engine/helpers.rs b/sbroad-core/src/executor/engine/helpers.rs index 722fb8224b..361be63436 100644 --- a/sbroad-core/src/executor/engine/helpers.rs +++ b/sbroad-core/src/executor/engine/helpers.rs @@ -38,6 +38,12 @@ pub fn normalize_name_from_sql(s: &str) -> String { format!("\"{}\"", s.to_uppercase()) } +/// A helper function to encode the execution plan into a pair of binary data: +/// * required data (plan id, parameters, etc.) +/// * optional data (execution plan, etc.) +/// +/// # Errors +/// - Failed to encode the execution plan. pub fn encode_plan(exec_plan: ExecutionPlan) -> Result<(Binary, Binary), SbroadError> { // We should not use the cache on the storage if the plan contains virtual tables, // as they can contain different amount of tuples that are not taken into account @@ -60,7 +66,11 @@ pub fn encode_plan(exec_plan: ExecutionPlan) -> Result<(Binary, Binary), SbroadE Ok((raw_required_data.into(), raw_optional_data.into())) } -pub fn explain_format(explain: String) -> Result<Box<dyn Any>, SbroadError> { +/// Format explain output into a tuple. +/// +/// # Errors +/// - Failed to create a tuple. +pub fn explain_format(explain: &str) -> Result<Box<dyn Any>, SbroadError> { let e = explain.lines().collect::<Vec<&str>>(); match Tuple::new(&vec![e]) { @@ -78,7 +88,7 @@ pub fn explain_format(explain: String) -> Result<Box<dyn Any>, SbroadError> { /// # Errors /// - Internal errors during the execution. pub fn dispatch( - runtime: &(impl Router + Vshard), + runtime: &impl Vshard, plan: &mut ExecutionPlan, top_id: usize, buckets: &Buckets, @@ -169,11 +179,11 @@ pub fn filter_vtable(plan: &mut ExecutionPlan, bucket_ids: &[u64]) { /// # Errors /// - The space was not found in the metadata. /// - The sharding keys are not present in the space. -pub fn sharding_keys_from_tuple<'rec>( +pub fn sharding_keys_from_tuple<'tuple>( conf: &impl Metadata, space: &str, - tuple: &'rec [Value], -) -> Result<Vec<&'rec Value>, SbroadError> { + tuple: &'tuple [Value], +) -> Result<Vec<&'tuple Value>, SbroadError> { let quoted_space = normalize_name_from_schema(space); let sharding_positions = conf.sharding_positions_by_space("ed_space)?; let mut sharding_tuple = Vec::with_capacity(sharding_positions.len()); diff --git a/sbroad-core/src/executor/protocol.rs b/sbroad-core/src/executor/protocol.rs index 68b7fe35a7..ec5ec5fba6 100644 --- a/sbroad-core/src/executor/protocol.rs +++ b/sbroad-core/src/executor/protocol.rs @@ -1,4 +1,4 @@ -use opentelemetry::{trace::TraceContextExt, Context}; +use opentelemetry::Context; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use tarantool::tlua::{self, AsLua, PushGuard, PushInto, PushOneInto, Void}; @@ -12,6 +12,9 @@ use crate::otm::{ current_id, extract_context, force_trace, get_tracer, inject_context, QueryTracer, }; +#[cfg(not(feature = "mock"))] +use opentelemetry::trace::TraceContextExt; + #[derive(Debug, Deserialize, Serialize, PartialEq)] pub struct Binary(Vec<u8>); -- GitLab