diff --git a/src/sql.rs b/src/sql.rs index 5aed004acd175554665c34065bbf4cad2221f529..6aa219c34b31243f486206594b92e611592dde9d 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -3,6 +3,7 @@ use crate::schema::{self, CreateSpaceParams, DistributionParam, Field, ShardingFn, SpaceDef}; use crate::sql::runtime::router::RouterRuntime; use crate::sql::runtime::storage::StorageRuntime; +use crate::traft::error::Error; use crate::traft::op::{Ddl as OpDdl, Op}; use crate::traft::{self, node}; @@ -32,29 +33,23 @@ pub fn dispatch_query(encoded_params: EncodedPatternWithParams) -> traft::Result let ctx = params.extract_context(); let tracer = params.get_tracer(); - let result = query_span( + query_span::<Result<Tuple, Error>, _>( "\"api.router\"", &id, &tracer, &ctx, ¶ms.pattern, || { - let runtime = RouterRuntime::new()?; - let mut query = Query::new(&runtime, ¶ms.pattern, params.params)?; - if query.is_ddl()? { + let runtime = RouterRuntime::new().map_err(Error::from)?; + let mut query = + Query::new(&runtime, ¶ms.pattern, params.params).map_err(Error::from)?; + if query.is_ddl().map_err(Error::from)? { let ir_plan = query.get_exec_plan().get_ir_plan(); - let top_id = ir_plan.get_top()?; + let top_id = ir_plan.get_top().map_err(Error::from)?; let ir_plan_mut = query.get_mut_exec_plan().get_mut_ir_plan(); - let ddl = ir_plan_mut.get_mut_ddl_node(top_id)?; - let timeout: f64 = ddl.timeout()?; - let storage = &node::global() - .map_err(|e| { - SbroadError::Invalid( - Entity::Runtime, - Some(format!("raft node error {e:?}")), - ) - })? - .storage; + let ddl = ir_plan_mut.get_mut_ddl_node(top_id).map_err(Error::from)?; + let timeout: f64 = ddl.timeout().map_err(Error::from)?; + let storage = &node::global()?.storage; let ddl_op = match ddl { Ddl::CreateShardedTable { ref mut name, @@ -82,73 +77,33 @@ pub fn dispatch_query(encoded_params: EncodedPatternWithParams) -> traft::Result sharding_fn: Some(ShardingFn::Murmur3), timeout, }; - let storage = &node::global() - .map_err(|e| { - SbroadError::Invalid( - Entity::Runtime, - Some(format!("raft node error {e:?}")), - ) - })? - .storage; - let mut params = params.validate(storage).map_err(|e| { - SbroadError::Invalid( - Entity::Table, - Some(format!("space parameters validation error {e:?}")), - ) - })?; - params.test_create_space(storage).map_err(|e| { - SbroadError::Invalid( - Entity::Table, - Some(format!("space parameters test error {e:?}")), - ) - })?; - params.into_ddl(storage).map_err(|e| { - SbroadError::FailedTo( - Action::Create, - Some(Entity::Table), - format!("{e:?}"), - ) - })? + let storage = &node::global()?.storage; + let mut params = params.validate(storage)?; + params.test_create_space(storage)?; + params.into_ddl(storage)? } Ddl::DropTable { ref name, .. } => { - let space_def: SpaceDef = storage - .spaces - .by_name(name) - .map_err(|e| { - SbroadError::FailedTo( - Action::Find, - Some(Entity::Table), - format!("{e:?}"), - ) - })? - .ok_or_else(|| { - SbroadError::FailedTo( + let space_def: SpaceDef = + storage.spaces.by_name(name)?.ok_or_else(|| { + Error::from(SbroadError::FailedTo( Action::Find, Some(Entity::Table), format!("{name} doesn't exist in pico_space"), - ) + )) })?; OpDdl::DropSpace { id: space_def.id } } }; let duration = Duration::from_secs_f64(timeout); - let schema_version = storage.properties.next_schema_version().map_err(|e| { - SbroadError::FailedTo(Action::Get, Some(Entity::Schema), format!("{e:?}")) - })?; + let schema_version = storage.properties.next_schema_version()?; let op = Op::DdlPrepare { schema_version, ddl: ddl_op, }; - let index = schema::prepare_schema_change(op, duration).map_err(|e| { - SbroadError::FailedTo(Action::Prepare, Some(Entity::Schema), format!("{e:?}")) - })?; - schema::wait_for_ddl_commit(index, duration).map_err(|e| { - SbroadError::FailedTo(Action::Create, Some(Entity::Space), format!("{e:?}")) - })?; + let index = schema::prepare_schema_change(op, duration)?; + schema::wait_for_ddl_commit(index, duration)?; let result = ConsumerResult { row_count: 1 }; - Tuple::new(&(result,)).map_err(|e| { - SbroadError::FailedTo(Action::Decode, Some(Entity::Tuple), format!("{:?}", e)) - }) + Tuple::new(&(result,)).map_err(Error::from) } else { match query.dispatch() { Ok(mut any_tuple) => { @@ -157,29 +112,21 @@ pub fn dispatch_query(encoded_params: EncodedPatternWithParams) -> traft::Result Option::from("dispatch"), &format!("Dispatch result: {tuple:?}"), ); - let empty_tuple = Tuple::new(&()).map_err(|e| { - SbroadError::FailedTo( - Action::Decode, - None, - format!("tuple {:?}", e), - ) - })?; - let tuple: Tuple = std::mem::replace(tuple, empty_tuple); + let tuple: Tuple = std::mem::replace(tuple, Tuple::new(&())?); Ok(tuple) } else { - Err(SbroadError::FailedTo( + Err(Error::from(SbroadError::FailedTo( Action::Decode, None, format!("tuple {any_tuple:?}"), - )) + ))) } } - Err(e) => Err(e), + Err(e) => Err(Error::from(e)), } } }, - ); - Ok(result?) + ) } /// Executes a query sub-plan on the local node. @@ -193,8 +140,8 @@ pub fn execute(raw: &RawBytes) -> traft::Result<Tuple> { let ctx = required.extract_context(); let tracer = required.tracer(); - let result = query_span("\"api.storage\"", &id, &tracer, &ctx, "", || { - let runtime = StorageRuntime::new()?; + query_span::<Result<Tuple, Error>, _>("\"api.storage\"", &id, &tracer, &ctx, "", || { + let runtime = StorageRuntime::new().map_err(Error::from)?; match runtime.execute_plan(&mut required, &mut raw_optional) { Ok(mut any_tuple) => { if let Some(tuple) = any_tuple.downcast_mut::<Tuple>() { @@ -202,21 +149,17 @@ pub fn execute(raw: &RawBytes) -> traft::Result<Tuple> { Option::from("execute"), &format!("Execution result: {tuple:?}"), ); - let empty_tuple = Tuple::new(&()).map_err(|e| { - SbroadError::FailedTo(Action::Decode, None, format!("tuple {:?}", e)) - })?; - let tuple: Tuple = std::mem::replace(tuple, empty_tuple); + let tuple: Tuple = std::mem::replace(tuple, Tuple::new(&())?); Ok(tuple) } else { - Err(SbroadError::FailedTo( + Err(Error::from(SbroadError::FailedTo( Action::Decode, None, format!("tuple {any_tuple:?}"), - )) + ))) } } - Err(e) => Err(e), + Err(e) => Err(Error::from(e)), } - }); - Ok(result?) + }) }