Skip to content
Snippets Groups Projects
Verified Commit 155f2886 authored by Denis Smirnov's avatar Denis Smirnov
Browse files

refactoring: return traft errors from SQL

Previously we created a very strange error sandwitch when failed
on the storage or router runtimes:
- wrapped traft error into a sbroad one
- wrapped the result again into traft error

Now we wrapp sbroad errors into traft or return traft errors as is.
parent 128afabf
No related branches found
No related tags found
1 merge request!604Support SQL CREATE TABLE command
......@@ -3,6 +3,7 @@
use crate::schema::{self, CreateSpaceParams, DistributionParam, Field, ShardingFn, SpaceDef};
use crate::sql::runtime::router::RouterRuntime;
use crate::sql::runtime::storage::StorageRuntime;
use crate::traft::error::Error;
use crate::traft::op::{Ddl as OpDdl, Op};
use crate::traft::{self, node};
......@@ -32,29 +33,23 @@ pub fn dispatch_query(encoded_params: EncodedPatternWithParams) -> traft::Result
let ctx = params.extract_context();
let tracer = params.get_tracer();
let result = query_span(
query_span::<Result<Tuple, Error>, _>(
"\"api.router\"",
&id,
&tracer,
&ctx,
&params.pattern,
|| {
let runtime = RouterRuntime::new()?;
let mut query = Query::new(&runtime, &params.pattern, params.params)?;
if query.is_ddl()? {
let runtime = RouterRuntime::new().map_err(Error::from)?;
let mut query =
Query::new(&runtime, &params.pattern, params.params).map_err(Error::from)?;
if query.is_ddl().map_err(Error::from)? {
let ir_plan = query.get_exec_plan().get_ir_plan();
let top_id = ir_plan.get_top()?;
let top_id = ir_plan.get_top().map_err(Error::from)?;
let ir_plan_mut = query.get_mut_exec_plan().get_mut_ir_plan();
let ddl = ir_plan_mut.get_mut_ddl_node(top_id)?;
let timeout: f64 = ddl.timeout()?;
let storage = &node::global()
.map_err(|e| {
SbroadError::Invalid(
Entity::Runtime,
Some(format!("raft node error {e:?}")),
)
})?
.storage;
let ddl = ir_plan_mut.get_mut_ddl_node(top_id).map_err(Error::from)?;
let timeout: f64 = ddl.timeout().map_err(Error::from)?;
let storage = &node::global()?.storage;
let ddl_op = match ddl {
Ddl::CreateShardedTable {
ref mut name,
......@@ -82,73 +77,33 @@ pub fn dispatch_query(encoded_params: EncodedPatternWithParams) -> traft::Result
sharding_fn: Some(ShardingFn::Murmur3),
timeout,
};
let storage = &node::global()
.map_err(|e| {
SbroadError::Invalid(
Entity::Runtime,
Some(format!("raft node error {e:?}")),
)
})?
.storage;
let mut params = params.validate(storage).map_err(|e| {
SbroadError::Invalid(
Entity::Table,
Some(format!("space parameters validation error {e:?}")),
)
})?;
params.test_create_space(storage).map_err(|e| {
SbroadError::Invalid(
Entity::Table,
Some(format!("space parameters test error {e:?}")),
)
})?;
params.into_ddl(storage).map_err(|e| {
SbroadError::FailedTo(
Action::Create,
Some(Entity::Table),
format!("{e:?}"),
)
})?
let storage = &node::global()?.storage;
let mut params = params.validate(storage)?;
params.test_create_space(storage)?;
params.into_ddl(storage)?
}
Ddl::DropTable { ref name, .. } => {
let space_def: SpaceDef = storage
.spaces
.by_name(name)
.map_err(|e| {
SbroadError::FailedTo(
Action::Find,
Some(Entity::Table),
format!("{e:?}"),
)
})?
.ok_or_else(|| {
SbroadError::FailedTo(
let space_def: SpaceDef =
storage.spaces.by_name(name)?.ok_or_else(|| {
Error::from(SbroadError::FailedTo(
Action::Find,
Some(Entity::Table),
format!("{name} doesn't exist in pico_space"),
)
))
})?;
OpDdl::DropSpace { id: space_def.id }
}
};
let duration = Duration::from_secs_f64(timeout);
let schema_version = storage.properties.next_schema_version().map_err(|e| {
SbroadError::FailedTo(Action::Get, Some(Entity::Schema), format!("{e:?}"))
})?;
let schema_version = storage.properties.next_schema_version()?;
let op = Op::DdlPrepare {
schema_version,
ddl: ddl_op,
};
let index = schema::prepare_schema_change(op, duration).map_err(|e| {
SbroadError::FailedTo(Action::Prepare, Some(Entity::Schema), format!("{e:?}"))
})?;
schema::wait_for_ddl_commit(index, duration).map_err(|e| {
SbroadError::FailedTo(Action::Create, Some(Entity::Space), format!("{e:?}"))
})?;
let index = schema::prepare_schema_change(op, duration)?;
schema::wait_for_ddl_commit(index, duration)?;
let result = ConsumerResult { row_count: 1 };
Tuple::new(&(result,)).map_err(|e| {
SbroadError::FailedTo(Action::Decode, Some(Entity::Tuple), format!("{:?}", e))
})
Tuple::new(&(result,)).map_err(Error::from)
} else {
match query.dispatch() {
Ok(mut any_tuple) => {
......@@ -157,29 +112,21 @@ pub fn dispatch_query(encoded_params: EncodedPatternWithParams) -> traft::Result
Option::from("dispatch"),
&format!("Dispatch result: {tuple:?}"),
);
let empty_tuple = Tuple::new(&()).map_err(|e| {
SbroadError::FailedTo(
Action::Decode,
None,
format!("tuple {:?}", e),
)
})?;
let tuple: Tuple = std::mem::replace(tuple, empty_tuple);
let tuple: Tuple = std::mem::replace(tuple, Tuple::new(&())?);
Ok(tuple)
} else {
Err(SbroadError::FailedTo(
Err(Error::from(SbroadError::FailedTo(
Action::Decode,
None,
format!("tuple {any_tuple:?}"),
))
)))
}
}
Err(e) => Err(e),
Err(e) => Err(Error::from(e)),
}
}
},
);
Ok(result?)
)
}
/// Executes a query sub-plan on the local node.
......@@ -193,8 +140,8 @@ pub fn execute(raw: &RawBytes) -> traft::Result<Tuple> {
let ctx = required.extract_context();
let tracer = required.tracer();
let result = query_span("\"api.storage\"", &id, &tracer, &ctx, "", || {
let runtime = StorageRuntime::new()?;
query_span::<Result<Tuple, Error>, _>("\"api.storage\"", &id, &tracer, &ctx, "", || {
let runtime = StorageRuntime::new().map_err(Error::from)?;
match runtime.execute_plan(&mut required, &mut raw_optional) {
Ok(mut any_tuple) => {
if let Some(tuple) = any_tuple.downcast_mut::<Tuple>() {
......@@ -202,21 +149,17 @@ pub fn execute(raw: &RawBytes) -> traft::Result<Tuple> {
Option::from("execute"),
&format!("Execution result: {tuple:?}"),
);
let empty_tuple = Tuple::new(&()).map_err(|e| {
SbroadError::FailedTo(Action::Decode, None, format!("tuple {:?}", e))
})?;
let tuple: Tuple = std::mem::replace(tuple, empty_tuple);
let tuple: Tuple = std::mem::replace(tuple, Tuple::new(&())?);
Ok(tuple)
} else {
Err(SbroadError::FailedTo(
Err(Error::from(SbroadError::FailedTo(
Action::Decode,
None,
format!("tuple {any_tuple:?}"),
))
)))
}
}
Err(e) => Err(e),
Err(e) => Err(Error::from(e)),
}
});
Ok(result?)
})
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment