diff --git a/sbroad-cartridge/src/api/exec_query.rs b/sbroad-cartridge/src/api/exec_query.rs index e9d62689d487c9aae31f39c52c00f176bfcc6277..220c2b5f25d9d4b5fda1b46c6eab123595a2f5ad 100644 --- a/sbroad-cartridge/src/api/exec_query.rs +++ b/sbroad-cartridge/src/api/exec_query.rs @@ -1,11 +1,10 @@ -use rmp::decode::RmpRead; +use sbroad::executor::engine::helpers::decode_msgpack; use std::os::raw::c_int; use tarantool::tuple::{FunctionArgs, FunctionCtx, Tuple, TupleBuffer}; use crate::api::helper::load_config; use crate::api::{COORDINATOR_ENGINE, SEGMENT_ENGINE}; use sbroad::backend::sql::ir::PatternWithParams; -use sbroad::errors::{Action, Entity, SbroadError}; use sbroad::executor::protocol::{EncodedRequiredData, RequiredData}; use sbroad::executor::Query; use sbroad::log::tarantool_error; @@ -83,61 +82,11 @@ pub extern "C" fn dispatch_query(f_ctx: FunctionCtx, args: FunctionArgs) -> c_in ) } -fn decode_msgpack(args: FunctionArgs) -> Result<(Vec<u8>, Vec<u8>), SbroadError> { - debug!(Option::from("decode_msgpack"), &format!("args: {args:?}")); - let tuple_buf: Vec<u8> = TupleBuffer::from(Tuple::from(args)).into(); - let mut stream = rmp::decode::Bytes::from(tuple_buf.as_slice()); - let array_len = rmp::decode::read_array_len(&mut stream).map_err(|e| { - SbroadError::FailedTo( - Action::Decode, - Some(Entity::MsgPack), - format!("array length: {e:?}"), - ) - })? as usize; - if array_len != 2 { - return Err(SbroadError::Invalid( - Entity::Tuple, - Some(format!("expected tuple of 2 elements, got {array_len}")), - )); - } - let req_len = rmp::decode::read_str_len(&mut stream).map_err(|e| { - SbroadError::FailedTo( - Action::Decode, - Some(Entity::MsgPack), - format!("read required data length: {e:?}"), - ) - })? as usize; - let mut required: Vec<u8> = vec![0_u8; req_len]; - stream.read_exact_buf(&mut required).map_err(|e| { - SbroadError::FailedTo( - Action::Decode, - Some(Entity::MsgPack), - format!("read required data: {e:?}"), - ) - })?; - - let opt_len = rmp::decode::read_str_len(&mut stream).map_err(|e| { - SbroadError::FailedTo( - Action::Decode, - Some(Entity::MsgPack), - format!("read optional data string length: {e:?}"), - ) - })? as usize; - let mut optional: Vec<u8> = vec![0_u8; opt_len]; - stream.read_exact_buf(&mut optional).map_err(|e| { - SbroadError::FailedTo( - Action::Decode, - Some(Entity::MsgPack), - format!("read optional data: {e:?}"), - ) - })?; - - Ok((required, optional)) -} - #[no_mangle] pub extern "C" fn execute(f_ctx: FunctionCtx, args: FunctionArgs) -> c_int { - let (raw_required, mut raw_optional) = match decode_msgpack(args) { + debug!(Option::from("decode_msgpack"), &format!("args: {args:?}")); + let tuple_buf: Vec<u8> = TupleBuffer::from(Tuple::from(args)).into(); + let (raw_required, mut raw_optional) = match decode_msgpack(tuple_buf.as_slice()) { Ok(raw_data) => raw_data, Err(e) => { let err = format!("Failed to decode dispatched data: {e:?}"); diff --git a/sbroad-core/src/executor/engine/helpers.rs b/sbroad-core/src/executor/engine/helpers.rs index 361be6343691a9c09c8a7040a44441cf32583a46..62c25af35c593674a296d513330312950fffd001 100644 --- a/sbroad-core/src/executor/engine/helpers.rs +++ b/sbroad-core/src/executor/engine/helpers.rs @@ -5,7 +5,10 @@ use std::{ rc::Rc, }; -use tarantool::tuple::Tuple; +use tarantool::tuple::{ + rmp::{self, decode::RmpRead}, + Tuple, +}; use crate::{ backend::sql::tree::{OrderedSyntaxNodes, SyntaxPlan}, @@ -66,6 +69,62 @@ pub fn encode_plan(exec_plan: ExecutionPlan) -> Result<(Binary, Binary), SbroadE Ok((raw_required_data.into(), raw_optional_data.into())) } +/// Decode the execution plan from msgpack into a pair of binary data: +/// * required data (plan id, parameters, etc.) +/// * optional data (execution plan, etc.) +/// +/// # Errors +/// - Failed to decode the execution plan. +pub fn decode_msgpack(tuple_buf: &[u8]) -> Result<(Vec<u8>, Vec<u8>), SbroadError> { + let mut stream = rmp::decode::Bytes::from(tuple_buf); + let array_len = rmp::decode::read_array_len(&mut stream).map_err(|e| { + SbroadError::FailedTo( + Action::Decode, + Some(Entity::MsgPack), + format!("array length: {e:?}"), + ) + })? as usize; + if array_len != 2 { + return Err(SbroadError::Invalid( + Entity::Tuple, + Some(format!("expected tuple of 2 elements, got {array_len}")), + )); + } + let req_len = rmp::decode::read_str_len(&mut stream).map_err(|e| { + SbroadError::FailedTo( + Action::Decode, + Some(Entity::MsgPack), + format!("read required data length: {e:?}"), + ) + })? as usize; + let mut required: Vec<u8> = vec![0_u8; req_len]; + stream.read_exact_buf(&mut required).map_err(|e| { + SbroadError::FailedTo( + Action::Decode, + Some(Entity::MsgPack), + format!("read required data: {e:?}"), + ) + })?; + + let opt_len = rmp::decode::read_str_len(&mut stream).map_err(|e| { + SbroadError::FailedTo( + Action::Decode, + Some(Entity::MsgPack), + format!("read optional data string length: {e:?}"), + ) + })? as usize; + let mut optional: Vec<u8> = vec![0_u8; opt_len]; + stream.read_exact_buf(&mut optional).map_err(|e| { + SbroadError::FailedTo( + Action::Decode, + Some(Entity::MsgPack), + format!("read optional data: {e:?}"), + ) + })?; + + Ok((required, optional)) +} + /// Format explain output into a tuple. /// /// # Errors