diff --git a/src/executor/engine.rs b/src/executor/engine.rs index 9b554bdee88aef2695be1b40c5429b0282ed72cf..f99a8cbde6cce7322856df1b36228f24de01bd61 100644 --- a/src/executor/engine.rs +++ b/src/executor/engine.rs @@ -13,6 +13,8 @@ pub trait Metadata { table_name: &str, ) -> Result<crate::ir::relation::Table, QueryPlannerError>; + fn get_exec_waiting_timeout(&self) -> u64; + fn to_name(s: &str) -> String { if let (Some('"'), Some('"')) = (s.chars().next(), s.chars().last()) { s.to_string() diff --git a/src/executor/engine/cartridge.rs b/src/executor/engine/cartridge.rs index 3546afa53711b2eaba6067f65a4aff866eb90dc5..2da91f3a8a0c53757670e694a2073bc31c99aa2b 100644 --- a/src/executor/engine/cartridge.rs +++ b/src/executor/engine/cartridge.rs @@ -1,4 +1,3 @@ -use std::collections::HashSet; use std::convert::TryInto; use tarantool::log::{say, SayLevel}; @@ -6,10 +5,11 @@ use tarantool::tlua::LuaFunction; use crate::errors::QueryPlannerError; use crate::executor::bucket::Buckets; -use crate::executor::engine::cartridge::cache::ClusterSchema; +use crate::executor::engine::cartridge::cache::ClusterAppConfig; use crate::executor::engine::cartridge::hash::str_to_bucket_id; use crate::executor::engine::Engine; use crate::executor::ir::ExecutionPlan; +use crate::executor::Metadata; use crate::executor::result::BoxExecuteFormat; use crate::executor::vtable::VirtualTable; @@ -19,13 +19,13 @@ pub mod hash; #[derive(Debug, Clone)] pub struct Runtime { - metadata: ClusterSchema, + metadata: ClusterAppConfig, bucket_count: usize, } /// Implements `Engine` interface for tarantool cartridge application impl Engine for Runtime { - type Metadata = ClusterSchema; + type Metadata = ClusterAppConfig; fn metadata(&self) -> &Self::Metadata { &self.metadata @@ -36,13 +36,50 @@ impl Engine for Runtime { } fn clear_metadata(&mut self) { - self.metadata = ClusterSchema::new(); + self.metadata = ClusterAppConfig::new(); } fn load_metadata(&mut self) -> Result<(), QueryPlannerError> { let lua = tarantool::lua_state(); - let get_schema: LuaFunction<_> = - lua.eval("return require('cartridge').get_schema").unwrap(); + + match lua.exec( + r#" + local cartridge = require('cartridge') + + function get_schema() + return cartridge.get_schema() + end + + function get_waiting_timeout() + local cfg = cartridge.config_get_readonly() + + if cfg["executor_waiting_timeout"] == nil then + return 0 + end + + return cfg["executor_waiting_timeout"] + end + + + "#, + ) { + Ok(_) => {} + Err(e) => { + say( + SayLevel::Error, + file!(), + line!().try_into().unwrap_or(0), + Option::from("load metadata"), + &format!("{:?}", e), + ); + return Err(QueryPlannerError::LuaError(format!( + "Failed to load Lua code: {:?}", + e + ))); + } + } + + let get_schema: LuaFunction<_> = lua.eval("return get_schema;").unwrap(); let res: String = match get_schema.call() { Ok(res) => res, @@ -51,14 +88,34 @@ impl Engine for Runtime { SayLevel::Error, file!(), line!().try_into().unwrap_or(0), - Option::from("load metadata"), + Option::from("getting schema"), + &format!("{:?}", e), + ); + return Err(QueryPlannerError::LuaError(format!("Lua error: {:?}", e))); + } + }; + + self.metadata.load_schema(&res)?; + + let timeout: LuaFunction<_> = lua.eval("return get_waiting_timeout;").unwrap(); + + let waiting_timeout: u64 = match timeout.call() { + Ok(res) => res, + Err(e) => { + say( + SayLevel::Error, + file!(), + line!().try_into().unwrap_or(0), + Option::from("getting waiting timeout"), &format!("{:?}", e), ); return Err(QueryPlannerError::LuaError(format!("Lua error: {:?}", e))); } }; - self.metadata.load(&res) + self.metadata.set_exec_waiting_timeout(waiting_timeout); + + Ok(()) } /// Execute sub tree on the nodes @@ -71,14 +128,7 @@ impl Engine for Runtime { let mut result = BoxExecuteFormat::new(); let sql = plan.subtree_as_sql(top_id)?; - match buckets { - Buckets::All => { - result.extend(cluster_exec_query(&sql)?)?; - } - Buckets::Filtered(list) => { - result.extend(bucket_exec_query(list, &sql)?)?; - } - } + result.extend(self.exec_query(&sql, buckets)?)?; Ok(result) } @@ -111,7 +161,7 @@ impl Engine for Runtime { impl Runtime { pub fn new() -> Result<Self, QueryPlannerError> { let mut result = Runtime { - metadata: ClusterSchema::new(), + metadata: ClusterAppConfig::new(), bucket_count: 0, }; @@ -167,30 +217,28 @@ impl Runtime { Ok(()) } -} -/// Send the query to a single bucket and merge results (map-reduce). -fn bucket_exec_query( - buckets: &HashSet<u64>, - query: &str, -) -> Result<BoxExecuteFormat, QueryPlannerError> { - let lua = tarantool::lua_state(); - match lua.exec( - r#" - local vshard = require('vshard') + fn exec_query( + &self, + query: &str, + buckets: &Buckets, + ) -> Result<BoxExecuteFormat, QueryPlannerError> { + let lua = tarantool::lua_state(); + match lua.exec( + r#"local vshard = require('vshard') local yaml = require('yaml') local log = require('log') ---get_uniq_replicaset_for_buckets - gets unique set of replicaset by bucket list ---@param buckets table - list of buckets. function get_uniq_replicaset_for_buckets(buckets) - local res = {} local uniq_replicas = {} for _, bucket_id in pairs(buckets) do local current_replicaset = vshard.router.route(bucket_id) uniq_replicas[current_replicaset.uuid] = current_replicaset end + local res = {} for _, r in pairs(uniq_replicas) do table.insert(res, r) end @@ -198,8 +246,20 @@ fn bucket_exec_query( return res end - function execute_sql(buckets, query) - local replicas = get_uniq_replicaset_for_buckets(buckets) + function execute_sql(query, buckets, waiting_timeout) + log.debug("Execution query: " .. query) + log.debug("Execution waiting timeout " .. tostring(waiting_timeout) .. "s") + + local replicas = nil + + if next(buckets) == nil then + replicas = vshard.router.routeall() + log.debug("Execution query on all instaces") + else + replicas = get_uniq_replicaset_for_buckets(buckets) + log.debug("Execution query on some instaces") + end + local result = nil local futures = {} @@ -209,11 +269,10 @@ fn bucket_exec_query( error(err) end table.insert(futures, future) - log.debug("Execute a query " .. query .. " on replica") end for _, future in ipairs(futures) do - future:wait_result(360) + future:wait_result(waiting_timeout) local res = future:result() if res[1] == nil then @@ -229,132 +288,51 @@ fn bucket_exec_query( end end - return result + return result end "#, - ) { - Ok(_) => {} - Err(e) => { - say( - SayLevel::Error, - file!(), - line!().try_into().unwrap_or(0), - Option::from("exec_query"), - &format!("{:?}", e), - ); - return Err(QueryPlannerError::LuaError(format!( - "Failed lua code loading: {:?}", - e - ))); - } - } - - let exec_sql: LuaFunction<_> = lua.get("execute_sql").ok_or_else(|| { - QueryPlannerError::LuaError("Lua function `execute_sql` not found".into()) - })?; - - let lua_buckets: Vec<u64> = buckets.iter().copied().collect(); - let res: BoxExecuteFormat = match exec_sql.call_with_args((lua_buckets, query)) { - Ok(v) => v, - Err(e) => { - say( - SayLevel::Error, - file!(), - line!().try_into().unwrap_or(0), - Option::from("exec_query"), - &format!("{:?}", e), - ); - return Err(QueryPlannerError::LuaError(format!("Lua error: {:?}", e))); + ) { + Ok(_) => {} + Err(e) => { + say( + SayLevel::Error, + file!(), + line!().try_into().unwrap_or(0), + Option::from("exec_query"), + &format!("{:?}", e), + ); + return Err(QueryPlannerError::LuaError(format!( + "Failed lua code loading: {:?}", + e + ))); + } } - }; - Ok(res) -} + let exec_sql: LuaFunction<_> = lua.get("execute_sql").ok_or_else(|| { + QueryPlannerError::LuaError("Lua function `execute_sql` not found".into()) + })?; -/// Send the query to all instances and merge results (map-reduce). -fn cluster_exec_query(query: &str) -> Result<BoxExecuteFormat, QueryPlannerError> { - say( - SayLevel::Debug, - file!(), - line!().try_into().unwrap_or(0), - None, - &format!("Execute a query {:?} on all instances", query), - ); - - let lua = tarantool::lua_state(); - - match lua.exec( - r#" - local vshard = require('vshard') - local yaml = require('yaml') - - function map_reduce_execute_sql(query) - local replicas = vshard.router.routeall() - - local result = nil - local futures = {} - for _, replica in pairs(replicas) do - local future, err = replica:callro("box.execute", { query }, {is_async = true}) - if err ~= nil then - error(err) - end - table.insert(futures, future) - end - - for _, future in ipairs(futures) do - future:wait_result(360) - local res = future:result() - - if res[1] == nil then - error(res[2]) - end + let lua_buckets = match buckets { + Buckets::All => vec![], + Buckets::Filtered(list) => list.iter().copied().collect(), + }; - if result == nil then - result = res[1] - else - for _, item in pairs(res[1].rows) do - table.insert(result.rows, item) - end - end - end + let waiting_timeout = &self.metadata().get_exec_waiting_timeout(); + let res: BoxExecuteFormat = + match exec_sql.call_with_args((query, lua_buckets, waiting_timeout)) { + Ok(v) => v, + Err(e) => { + say( + SayLevel::Error, + file!(), + line!().try_into().unwrap_or(0), + Option::from("exec_query"), + &format!("{:?}", e), + ); + return Err(QueryPlannerError::LuaError(format!("Lua error: {:?}", e))); + } + }; - return result - end - "#, - ) { - Ok(_) => {} - Err(e) => { - say( - SayLevel::Error, - file!(), - line!().try_into().unwrap_or(0), - Option::from("cluster_exec_query"), - &format!("{:?}", e), - ); - return Err(QueryPlannerError::LuaError(format!( - "Failed lua code loading: {:?}", - e - ))); - } + Ok(res) } - - let exec_sql: LuaFunction<_> = lua.get("map_reduce_execute_sql").ok_or_else(|| { - QueryPlannerError::LuaError("Lua function `map_reduce_execute_sql` not found".into()) - })?; - - let res: BoxExecuteFormat = match exec_sql.call_with_args(query) { - Ok(v) => v, - Err(e) => { - say( - SayLevel::Error, - file!(), - line!().try_into().unwrap_or(0), - Option::from("cluster_exec_query"), - &format!("{:?}", e), - ); - return Err(QueryPlannerError::LuaError(format!("Lua error: {:?}", e))); - } - }; - - Ok(res) } diff --git a/src/executor/engine/cartridge/cache.rs b/src/executor/engine/cartridge/cache.rs index 080ef7ebf5a93592d51f0b684594911d0d8549eb..656e557e4959fff6bd11531f015114a611ec0ff6 100644 --- a/src/executor/engine/cartridge/cache.rs +++ b/src/executor/engine/cartridge/cache.rs @@ -15,18 +15,22 @@ use self::yaml_rust::yaml; /// Information based on tarantool cartridge schema. Cache knows nothing about bucket distribution in the cluster, /// as it is managed by Tarantool's vshard module. #[derive(Debug, Clone, PartialEq)] -pub struct ClusterSchema { +pub struct ClusterAppConfig { /// Tarantool cartridge schema schema: yaml::Yaml, + /// Execute response waiting timeout in seconds. + waiting_timeout: u64, + /// IR table segments from the cluster spaces tables: HashMap<String, Table>, } -impl ClusterSchema { +impl ClusterAppConfig { pub fn new() -> Self { - ClusterSchema { + ClusterAppConfig { schema: yaml::Yaml::Null, + waiting_timeout: 360, tables: HashMap::new(), } } @@ -35,11 +39,13 @@ impl ClusterSchema { /// /// # Errors /// Returns `QueryPlannerError` when process was terminated. - pub fn load(&mut self, s: &str) -> Result<(), QueryPlannerError> { + pub fn load_schema(&mut self, s: &str) -> Result<(), QueryPlannerError> { if let Ok(docs) = YamlLoader::load_from_str(s) { - self.schema = docs[0].clone(); - self.init_table_segments()?; - return Ok(()); + if let Some(doc) = docs.get(0) { + self.schema = doc.clone(); + self.init_table_segments()?; + return Ok(()); + } } Err(QueryPlannerError::InvalidClusterSchema) @@ -61,6 +67,10 @@ impl ClusterSchema { result } + pub(in crate::executor::engine::cartridge) fn is_empty(&self) -> bool { + self.schema.is_null() + } + /// Transform space information from schema to table segments /// /// # Errors @@ -108,7 +118,7 @@ impl ClusterSchema { None => return Err(QueryPlannerError::SpaceFormatNotFound), }; - let table_name: String = ClusterSchema::to_name(current_space_name); + let table_name: String = ClusterAppConfig::to_name(current_space_name); let keys_str = keys.iter().map(String::as_str).collect::<Vec<&str>>(); let t = Table::new_seg(&table_name, fields, keys_str.as_slice())?; self.tables.insert(table_name, t); @@ -120,12 +130,15 @@ impl ClusterSchema { Ok(()) } - pub(in crate::executor::engine::cartridge) fn is_empty(&self) -> bool { - self.schema.is_null() + /// Setup response waiting timeout for executor + pub fn set_exec_waiting_timeout(&mut self, timeout: u64) { + if timeout > 0 { + self.waiting_timeout = timeout; + } } } -impl Metadata for ClusterSchema { +impl Metadata for ClusterAppConfig { /// Get table segment form cache by table name /// /// # Errors @@ -137,6 +150,11 @@ impl Metadata for ClusterSchema { None => Err(QueryPlannerError::SpaceNotFound), } } + + /// Get response waiting timeout for executor + fn get_exec_waiting_timeout(&self) -> u64 { + self.waiting_timeout + } } #[cfg(test)] diff --git a/src/executor/engine/cartridge/cache/tests.rs b/src/executor/engine/cartridge/cache/tests.rs index 4b973d2027be08fb929afc1d765f94f3f0000111..901eb527bfe7135827eb398fda597ff01d8051fd 100644 --- a/src/executor/engine/cartridge/cache/tests.rs +++ b/src/executor/engine/cartridge/cache/tests.rs @@ -82,8 +82,8 @@ fn test_yaml_schema_parser() { - \"identification_number\" - \"product_code\""; - let mut s = ClusterSchema::new(); - s.load(test_schema).unwrap(); + let mut s = ClusterAppConfig::new(); + s.load_schema(test_schema).unwrap(); let mut expected_keys = Vec::new(); expected_keys.push("\"identification_number\"".to_string()); @@ -136,8 +136,8 @@ fn test_getting_table_segment() { - \"identification_number\" - \"product_code\""; - let mut s = ClusterSchema::new(); - s.load(test_schema).unwrap(); + let mut s = ClusterAppConfig::new(); + s.load_schema(test_schema).unwrap(); let expected = Table::new_seg( "\"hash_testing\"", @@ -158,3 +158,13 @@ fn test_getting_table_segment() { ); assert_eq!(s.get_table_segment("\"hash_testing\"").unwrap(), expected) } + +#[test] +fn test_waiting_timeout() { + let mut s = ClusterAppConfig::new(); + s.set_exec_waiting_timeout(200); + + assert_ne!(s.get_exec_waiting_timeout(), 360); + + assert_eq!(s.get_exec_waiting_timeout(), 200); +} diff --git a/src/executor/engine/mock.rs b/src/executor/engine/mock.rs index 0456ef524e7e5f3a01d70843df2b9fda48d305d0..22ac00400b0f2062f10e21bae3c3c4566c47d62e 100644 --- a/src/executor/engine/mock.rs +++ b/src/executor/engine/mock.rs @@ -24,6 +24,10 @@ impl Metadata for MetadataMock { None => Err(QueryPlannerError::SpaceNotFound), } } + + fn get_exec_waiting_timeout(&self) -> u64 { + 0 + } } impl MetadataMock { diff --git a/test_app/test/helper.lua b/test_app/test/helper.lua index 4ca503ae24421b1889f4ec1a9cb362f37f275d8b..ed43ded661bfca5da0441edbd6cd9ee3a6c1d5f9 100644 --- a/test_app/test/helper.lua +++ b/test_app/test/helper.lua @@ -44,6 +44,7 @@ helper.cluster = cartridge_helpers.Cluster:new({ }) local config = { + ["executor_waiting_timeout"] = 200, ["schema"] = { spaces = { testing_space = {