Skip to content
Snippets Groups Projects
Commit 9b32fa04 authored by Igor Kuznetsov's avatar Igor Kuznetsov
Browse files

refactoring: migrate to ref engine and validate empty schema was moved to load function

parent 9795a228
No related branches found
No related tags found
1 merge request!1414sbroad import
......@@ -23,6 +23,8 @@
//! 5. Repeats step 3 till we are done with motion layers.
//! 6. Executes the final IR top subtree and returns the final result to the user.
use std::collections::HashMap;
use crate::errors::QueryPlannerError;
use crate::executor::bucket::Buckets;
use crate::executor::engine::Engine;
......@@ -31,7 +33,6 @@ use crate::executor::ir::ExecutionPlan;
use crate::executor::result::BoxExecuteFormat;
use crate::frontend::sql::ast::AbstractSyntaxTree;
use crate::ir::Plan;
use std::collections::HashMap;
pub mod bucket;
pub mod engine;
......@@ -54,19 +55,19 @@ impl Plan {
}
/// Query to execute.
pub struct Query<T>
pub struct Query<'a, T>
where
T: Engine,
{
/// Execution plan
exec_plan: ExecutionPlan,
/// Execution engine
engine: T,
engine: &'a mut T,
/// Bucket map
bucket_map: HashMap<usize, Buckets>,
}
impl<T> Query<T>
impl<'a, T> Query<'a, T>
where
T: Engine,
{
......@@ -77,7 +78,7 @@ where
/// - Failed to build AST.
/// - Failed to build IR plan.
/// - Failed to apply optimizing transformations to IR plan.
pub fn new(engine: T, sql: &str) -> Result<Self, QueryPlannerError>
pub fn new(engine: &'a mut T, sql: &str) -> Result<Self, QueryPlannerError>
where
T::Metadata: Metadata,
{
......@@ -93,6 +94,7 @@ where
}
/// Get the execution plan of the query.
#[must_use]
pub fn get_exec_plan(&self) -> &ExecutionPlan {
&self.exec_plan
}
......
use std::collections::HashSet;
use traversal::DftPost;
use crate::errors::QueryPlannerError;
use crate::executor::engine::Engine;
use crate::executor::Query;
......@@ -5,8 +9,6 @@ use crate::ir::distribution::Distribution;
use crate::ir::expression::Expression;
use crate::ir::operator::{Bool, Relational};
use crate::ir::transformation::redistribution::MotionPolicy;
use std::collections::HashSet;
use traversal::DftPost;
/// Buckets are used to determine which nodes to send the query to.
#[derive(Clone, Debug, PartialEq)]
......@@ -58,7 +60,7 @@ impl Buckets {
}
}
impl<T> Query<T>
impl<'a, T> Query<'a, T>
where
T: Engine,
{
......
......@@ -14,8 +14,8 @@ fn simple_union_query() {
) as "t3"
WHERE "id" = 1"#;
let engine = EngineMock::new();
let mut query = Query::new(engine, query).unwrap();
let mut engine = EngineMock::new();
let mut query = Query::new(&mut engine, query).unwrap();
let plan = query.exec_plan.get_ir_plan();
let top = plan.get_top().unwrap();
let buckets = query.bucket_discovery(top).unwrap();
......@@ -35,8 +35,8 @@ fn simple_disjunction_in_union_query() {
) as "t3"
WHERE ("id" = 1) OR ("id" = 100)"#;
let engine = EngineMock::new();
let mut query = Query::new(engine, query).unwrap();
let mut engine = EngineMock::new();
let mut query = Query::new(&mut engine, query).unwrap();
let plan = query.exec_plan.get_ir_plan();
let top = plan.get_top().unwrap();
let buckets = query.bucket_discovery(top).unwrap();
......@@ -61,8 +61,8 @@ fn complex_shard_key_union_query() {
WHERE "sys_op" > 1) AS "t3"
WHERE "identification_number" = 1 AND "product_code" = '222'"#;
let engine = EngineMock::new();
let mut query = Query::new(engine, query).unwrap();
let mut engine = EngineMock::new();
let mut query = Query::new(&mut engine, query).unwrap();
let plan = query.exec_plan.get_ir_plan();
let top = plan.get_top().unwrap();
let buckets = query.bucket_discovery(top).unwrap();
......@@ -90,8 +90,8 @@ fn union_complex_cond_query() {
AND ("product_code" = '222'
OR "product_code" = '111')"#;
let engine = EngineMock::new();
let mut query = Query::new(engine, query).unwrap();
let mut engine = EngineMock::new();
let mut query = Query::new(&mut engine, query).unwrap();
let plan = query.exec_plan.get_ir_plan();
let top = plan.get_top().unwrap();
let buckets = query.bucket_discovery(top).unwrap();
......@@ -123,8 +123,8 @@ fn union_query_conjunction() {
UNION ALL
SELECT * FROM "test_space_hist" WHERE "id" = 2"#;
let engine = EngineMock::new();
let mut query = Query::new(engine, query).unwrap();
let mut engine = EngineMock::new();
let mut query = Query::new(&mut engine, query).unwrap();
let plan = query.exec_plan.get_ir_plan();
let top = plan.get_top().unwrap();
let buckets = query.bucket_discovery(top).unwrap();
......
......@@ -54,9 +54,6 @@ pub trait Engine {
where
Self: Sized;
/// Checking that metadata isn't empty
fn has_metadata(&self) -> bool;
/// Clear metadata information
fn clear_metadata(&mut self);
......
......@@ -36,54 +36,17 @@ impl Engine for Runtime {
&self.metadata
}
fn has_metadata(&self) -> bool {
self.metadata.is_empty()
}
fn clear_metadata(&mut self) {
self.metadata = ClusterAppConfig::new();
}
fn load_metadata(&mut self) -> Result<(), QueryPlannerError> {
let lua = tarantool::lua_state();
match lua.exec(
r#"
local cartridge = require('cartridge')
function get_schema()
return cartridge.get_schema()
end
function get_waiting_timeout()
local cfg = cartridge.config_get_readonly()
if cfg["executor_waiting_timeout"] == nil then
return 0
end
return cfg["executor_waiting_timeout"]
end
"#,
) {
Ok(_) => {}
Err(e) => {
say(
SayLevel::Error,
file!(),
line!().try_into().unwrap_or(0),
Option::from("load metadata"),
&format!("{:?}", e),
);
return Err(QueryPlannerError::LuaError(format!(
"Failed to load Lua code: {:?}",
e
)));
}
if !&self.metadata.is_empty() {
return Ok(());
}
let lua = tarantool::lua_state();
let get_schema: LuaFunction<_> = lua.eval("return get_schema;").unwrap();
let res: String = match get_schema.call() {
......
......@@ -157,10 +157,6 @@ impl Engine for EngineMock {
&self.metadata
}
fn has_metadata(&self) -> bool {
self.metadata.tables.is_empty()
}
fn clear_metadata(&mut self) {
self.metadata.tables.clear();
}
......
use super::*;
use pretty_assertions::assert_eq;
use crate::executor::engine::mock::EngineMock;
use crate::executor::result::Value;
use crate::executor::vtable::VirtualTable;
use crate::ir::relation::{Column, Type};
use crate::ir::value::Value as IrValue;
use pretty_assertions::assert_eq;
use super::*;
#[test]
fn shard_query() {
let sql = r#"SELECT "FIRST_NAME" FROM "test_space" where "id" = 1"#;
let engine = EngineMock::new();
let mut engine = EngineMock::new();
let mut query = Query::new(engine, sql).unwrap();
let mut query = Query::new(&mut engine, sql).unwrap();
let mut expected = BoxExecuteFormat::new();
let bucket = query.engine.determine_bucket_id("1");
......@@ -37,9 +39,9 @@ fn shard_union_query() {
WHERE "sys_op" > 1) AS "t3"
WHERE "id" = 1"#;
let engine = EngineMock::new();
let mut engine = EngineMock::new();
let mut query = Query::new(engine, sql).unwrap();
let mut query = Query::new(&mut engine, sql).unwrap();
let mut expected = BoxExecuteFormat::new();
let bucket = query.engine.determine_bucket_id("1");
......@@ -67,9 +69,9 @@ fn shard_union_query() {
#[test]
fn map_reduce_query() {
let sql = r#"SELECT "product_code" FROM "hash_testing" where "identification_number" = 1 and "product_code" = '457'"#;
let engine = EngineMock::new();
let mut engine = EngineMock::new();
let mut query = Query::new(engine, sql).unwrap();
let mut query = Query::new(&mut engine, sql).unwrap();
let mut expected = BoxExecuteFormat::new();
let bucket = query.engine.determine_bucket_id(&["1", "457"].join(""));
......@@ -92,9 +94,9 @@ fn map_reduce_query() {
fn linker_test() {
let sql = r#"SELECT "FIRST_NAME" FROM "test_space" where "id" in
(SELECT "identification_number" FROM "hash_testing" where "identification_number" > 1)"#;
let engine = EngineMock::new();
let mut engine = EngineMock::new();
let mut query = Query::new(engine, sql).unwrap();
let mut query = Query::new(&mut engine, sql).unwrap();
let motion_id = query.exec_plan.get_ir_plan().get_slices().unwrap()[0][0];
let virtual_table = virtual_table_23();
query.engine.add_virtual_table(motion_id, virtual_table);
......@@ -142,9 +144,9 @@ fn union_linker_test() {
) as "t2"
WHERE "product_code" = '123')"#;
let engine = EngineMock::new();
let mut engine = EngineMock::new();
let mut query = Query::new(engine, sql).unwrap();
let mut query = Query::new(&mut engine, sql).unwrap();
let motion_id = query.exec_plan.get_ir_plan().get_slices().unwrap()[0][0];
let virtual_table = virtual_table_23();
query.engine.add_virtual_table(motion_id, virtual_table);
......@@ -221,9 +223,9 @@ INNER JOIN
ON "t3"."id" = "t8"."identification_number"
WHERE "t3"."id" = 2 AND "t8"."identification_number" = 2"#;
let engine = EngineMock::new();
let mut engine = EngineMock::new();
let mut query = Query::new(engine, sql).unwrap();
let mut query = Query::new(&mut engine, sql).unwrap();
let motion_id = query.exec_plan.get_ir_plan().get_slices().unwrap()[0][0];
let mut virtual_table = virtual_table_23();
virtual_table.set_alias("\"t8\"").unwrap();
......@@ -271,9 +273,9 @@ fn join_linker2_test() {
select "id" as "id1", "id" as "id2" from "test_space_hist"
) as "t2" on "t1"."id" = 1"#;
let engine = EngineMock::new();
let mut engine = EngineMock::new();
let mut query = Query::new(engine, sql).unwrap();
let mut query = Query::new(&mut engine, sql).unwrap();
let motion_id = query.exec_plan.get_ir_plan().get_slices().unwrap()[0][0];
let mut virtual_table = VirtualTable::new();
......@@ -323,9 +325,9 @@ fn join_linker3_test() {
(SELECT "id" as "id1", "FIRST_NAME" FROM "test_space") AS "t2"
ON "t2"."id1" = 1"#;
let engine = EngineMock::new();
let mut engine = EngineMock::new();
let mut query = Query::new(engine, sql).unwrap();
let mut query = Query::new(&mut engine, sql).unwrap();
let motion_id = query.exec_plan.get_ir_plan().get_slices().unwrap()[0][0];
let mut virtual_table = VirtualTable::new();
......@@ -375,9 +377,9 @@ fn join_linker4_test() {
on t1."id" = t2."r_id" and
t1."FIRST_NAME" = (SELECT "FIRST_NAME" as "fn" FROM "test_space" WHERE "id" = 1)"#;
let engine = EngineMock::new();
let mut engine = EngineMock::new();
let mut query = Query::new(engine, sql).unwrap();
let mut query = Query::new(&mut engine, sql).unwrap();
let motion_t2_id = query.exec_plan.get_ir_plan().get_slices().unwrap()[0][0];
let mut virtual_t2 = VirtualTable::new();
......@@ -440,9 +442,9 @@ fn anonymous_col_index_test() {
WHERE "id" in (SELECT "identification_number" FROM "hash_testing" WHERE "product_units" < 3)
OR "id" in (SELECT "identification_number" FROM "hash_testing" WHERE "product_units" > 5)"#;
let engine = EngineMock::new();
let mut engine = EngineMock::new();
let mut query = Query::new(engine, sql).unwrap();
let mut query = Query::new(&mut engine, sql).unwrap();
let motion1_id = query.exec_plan.get_ir_plan().get_slices().unwrap()[0][0];
query
.engine
......
......@@ -57,7 +57,9 @@ pub extern "C" fn calculate_bucket_id(ctx: FunctionCtx, args: FunctionArgs) -> c
let args = args.into_struct::<BucketCalcArgs>().unwrap();
QUERY_ENGINE.with(|e| {
let result = e.clone().into_inner().determine_bucket_id(&args.rec);
let engine = &mut *e.borrow_mut();
let result = engine.determine_bucket_id(&args.rec);
ctx.return_mp(&result).unwrap();
0
})
......@@ -66,16 +68,11 @@ pub extern "C" fn calculate_bucket_id(ctx: FunctionCtx, args: FunctionArgs) -> c
#[no_mangle]
pub extern "C" fn calculate_bucket_id_by_dict(ctx: FunctionCtx, args: FunctionArgs) -> c_int {
QUERY_ENGINE.with(|e| {
let mut engine = e.clone().into_inner();
let engine = &mut *e.borrow_mut();
// Update cartridge schema after cache invalidation by calling `apply_config()` in lua code.
if engine.has_metadata() {
match engine.load_metadata() {
Ok(_) => *e.borrow_mut() = engine.clone(),
Err(e) => {
return tarantool::set_error!(TarantoolErrorCode::ProcC, "{}", e.to_string());
}
};
}
if let Err(e) = engine.load_metadata() {
return tarantool::set_error!(TarantoolErrorCode::ProcC, "{}", e.to_string());
};
// Closure for more concise error propagation from calls nested in the bucket calculation
let propagate_err = || -> Result<u64, QueryPlannerError> {
......@@ -90,7 +87,7 @@ pub extern "C" fn calculate_bucket_id_by_dict(ctx: FunctionCtx, args: FunctionAr
acc.push_str(s.as_str());
acc
});
Ok(e.clone().into_inner().determine_bucket_id(fk.as_str()))
Ok(engine.determine_bucket_id(fk.as_str()))
};
match propagate_err() {
......@@ -115,17 +112,12 @@ pub extern "C" fn execute_query(ctx: FunctionCtx, args: FunctionArgs) -> c_int {
let args: Tuple = args.into();
let args = args.into_struct::<Args>().unwrap();
QUERY_ENGINE.with(|s| {
let mut engine = s.clone().into_inner();
QUERY_ENGINE.with(|e| {
let engine = &mut *e.borrow_mut();
// Update cartridge schema after cache invalidation by calling `apply_config()` in lua code.
if engine.has_metadata() {
match engine.load_metadata() {
Ok(_) => *s.borrow_mut() = engine.clone(),
Err(e) => {
return tarantool::set_error!(TarantoolErrorCode::ProcC, "{}", e.to_string());
}
};
}
if let Err(e) = engine.load_metadata() {
return tarantool::set_error!(TarantoolErrorCode::ProcC, "{}", e.to_string());
};
let mut query = match Query::new(engine, args.query.as_str()) {
Ok(q) => q,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment