From 4e1574c12e2571d34f3438598fbe7f6e9ab28182 Mon Sep 17 00:00:00 2001 From: Arseniy Volynets <vol0ncar@yandex.ru> Date: Tue, 16 Jan 2024 14:23:35 +0300 Subject: [PATCH] refactor: tracing - refactor tracing api to take tracer from library user. Now there is a trait for Tracer and sbroad does really know what tracer is being passed - All tracing tables were moved out to Picodata - remove tracing from cartridge: as it is not used and no need to maintain it --- Cargo.lock | 2 - .../cartridge/roles/sbroad-router.lua | 1 - .../cartridge/roles/sbroad-storage.lua | 1 - sbroad-cartridge/src/api.rs | 1 - sbroad-cartridge/src/api/exec_query.rs | 168 +++-- sbroad-cartridge/src/api/statistics.rs | 21 - sbroad-cartridge/src/cartridge.rs | 30 +- sbroad-cartridge/src/cartridge/config.rs | 32 - sbroad-cartridge/src/cartridge/router.rs | 28 +- sbroad-cartridge/src/cartridge/storage.rs | 25 +- sbroad-cartridge/src/init.lua | 25 - sbroad-cartridge/src/router.lua | 19 +- sbroad-cartridge/test_app/app/roles/api.lua | 18 - .../test_app/test/integration/api_test.lua | 40 -- .../test_app/test/integration/insert_test.lua | 76 --- sbroad-core/Cargo.toml | 3 +- sbroad-core/src/backend/sql/ir.rs | 21 +- sbroad-core/src/core.lua | 9 - sbroad-core/src/executor.rs | 1 - sbroad-core/src/executor/protocol.rs | 83 +-- sbroad-core/src/lib.rs | 3 - sbroad-core/src/otm.rs | 357 ++++------- sbroad-core/src/otm/fiber.rs | 2 +- sbroad-core/src/otm/statistics.rs | 165 ----- sbroad-core/src/otm/statistics/eviction.rs | 60 -- sbroad-core/src/otm/statistics/table.rs | 598 ------------------ 26 files changed, 245 insertions(+), 1544 deletions(-) delete mode 100644 sbroad-cartridge/src/api/statistics.rs delete mode 100644 sbroad-core/src/otm/statistics.rs delete mode 100644 sbroad-core/src/otm/statistics/eviction.rs delete mode 100644 sbroad-core/src/otm/statistics/table.rs diff --git a/Cargo.lock b/Cargo.lock index 56c5c65cc0..4d19785b1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1146,9 +1146,7 @@ dependencies = [ "blake3", "hash32", "itertools", - "lazy_static", "opentelemetry", - "opentelemetry-jaeger", "pest", "pest_derive", "pretty_assertions", diff --git a/sbroad-cartridge/cartridge/roles/sbroad-router.lua b/sbroad-cartridge/cartridge/roles/sbroad-router.lua index 8b88703ae6..1f6db9d39e 100644 --- a/sbroad-cartridge/cartridge/roles/sbroad-router.lua +++ b/sbroad-cartridge/cartridge/roles/sbroad-router.lua @@ -13,7 +13,6 @@ local function init(opts) -- luacheck: no unused args sbroad_router.init(opts.is_master) sbroad_common.init(opts.is_master) - sbroad_common.init_statistics() return true end diff --git a/sbroad-cartridge/cartridge/roles/sbroad-storage.lua b/sbroad-cartridge/cartridge/roles/sbroad-storage.lua index cb1402da3a..b1d0b061cc 100644 --- a/sbroad-cartridge/cartridge/roles/sbroad-storage.lua +++ b/sbroad-cartridge/cartridge/roles/sbroad-storage.lua @@ -9,7 +9,6 @@ local function init(opts) -- luacheck: no unused args _G.sbroad.calculate_bucket_id = sbroad_common.calculate_bucket_id sbroad_common.init(opts.is_master) - sbroad_common.init_statistics() sbroad_storage.init(opts.is_master) return true diff --git a/sbroad-cartridge/src/api.rs b/sbroad-cartridge/src/api.rs index bc4620af3b..27b3dc1ea6 100644 --- a/sbroad-cartridge/src/api.rs +++ b/sbroad-cartridge/src/api.rs @@ -16,4 +16,3 @@ pub mod calculate_bucket_id; pub mod exec_query; mod helper; pub mod invalidate_cached_schema; -pub mod statistics; diff --git a/sbroad-cartridge/src/api/exec_query.rs b/sbroad-cartridge/src/api/exec_query.rs index a24721d30a..dc30c0c4f0 100644 --- a/sbroad-cartridge/src/api/exec_query.rs +++ b/sbroad-cartridge/src/api/exec_query.rs @@ -8,13 +8,13 @@ use sbroad::backend::sql::ir::PatternWithParams; use sbroad::executor::protocol::{EncodedRequiredData, RequiredData}; use sbroad::executor::Query; use sbroad::log::tarantool_error; -use sbroad::otm::{child_span, query_span, QueryTracer}; +use sbroad::otm::child_span; use sbroad::{debug, error}; /// Dispatch parameterized SQL query from coordinator to the segments. #[no_mangle] pub extern "C" fn dispatch_query(f_ctx: FunctionCtx, args: FunctionArgs) -> c_int { - let mut lua_params = match PatternWithParams::try_from(args) { + let lua_params = match PatternWithParams::try_from(args) { Ok(params) => params, Err(e) => { error!(Option::from("dispatch_query"), &format!("Error: {e}")); @@ -26,66 +26,52 @@ pub extern "C" fn dispatch_query(f_ctx: FunctionCtx, args: FunctionArgs) -> c_in // As a side effect, we can't trace load_config() call itself (sic!). let ret_code = load_config(&COORDINATOR_ENGINE); - let id = lua_params.clone_id(); - let ctx = lua_params.extract_context(); - let tracer: QueryTracer = lua_params.tracer.clone(); - - query_span( - "\"api.router\"", - &id, - &tracer, - &ctx, - &lua_params.pattern, - || { - if ret_code != 0 { - return ret_code; + if ret_code != 0 { + return ret_code; + } + COORDINATOR_ENGINE.with(|engine| { + let runtime = match engine.try_borrow() { + Ok(runtime) => runtime, + Err(e) => { + return tarantool_error(&format!( + "Failed to borrow the runtime while dispatching the query: {e}" + )); } - COORDINATOR_ENGINE.with(|engine| { - let runtime = match engine.try_borrow() { - Ok(runtime) => runtime, - Err(e) => { - return tarantool_error(&format!( - "Failed to borrow the runtime while dispatching the query: {e}" - )); - } - }; - let mut query = match Query::new(&*runtime, &lua_params.pattern, lua_params.params) - { - Ok(q) => q, - Err(e) => { - error!(Option::from("query dispatch"), &format!("{e:?}")); - return tarantool_error(&e.to_string()); - } - }; - if let Ok(true) = query.is_ddl() { - return tarantool_error("DDL queries are not supported"); - } - if let Ok(true) = query.is_acl() { - return tarantool_error("ACL queries are not supported"); - } + }; + let mut query = match Query::new(&*runtime, &lua_params.pattern, lua_params.params) { + Ok(q) => q, + Err(e) => { + error!(Option::from("query dispatch"), &format!("{e:?}")); + return tarantool_error(&e.to_string()); + } + }; + if let Ok(true) = query.is_ddl() { + return tarantool_error("DDL queries are not supported"); + } + if let Ok(true) = query.is_acl() { + return tarantool_error("ACL queries are not supported"); + } - match query.dispatch() { - Ok(result) => child_span("\"tarantool.tuple.return\"", || { - if let Some(tuple) = (*result).downcast_ref::<Tuple>() { - debug!( - Option::from("query dispatch"), - &format!("Returning tuple: {tuple:?}") - ); - f_ctx.return_tuple(tuple).unwrap(); - 0 - } else { - error!( - Option::from("query dispatch"), - &format!("Failed to downcast result: {result:?}") - ); - tarantool_error("Unsupported result type") - } - }), - Err(e) => tarantool_error(&e.to_string()), + match query.dispatch() { + Ok(result) => child_span("\"tarantool.tuple.return\"", || { + if let Some(tuple) = (*result).downcast_ref::<Tuple>() { + debug!( + Option::from("query dispatch"), + &format!("Returning tuple: {tuple:?}") + ); + f_ctx.return_tuple(tuple).unwrap(); + 0 + } else { + error!( + Option::from("query dispatch"), + &format!("Failed to downcast result: {result:?}") + ); + tarantool_error("Unsupported result type") } - }) - }, - ) + }), + Err(e) => tarantool_error(&e.to_string()), + } + }) } #[no_mangle] @@ -115,42 +101,36 @@ pub extern "C" fn execute(f_ctx: FunctionCtx, args: FunctionArgs) -> c_int { } }; - let id = required.trace_id().to_string(); - let ctx = required.extract_context(); - let tracer = required.tracer(); - - query_span("\"api.storage\"", &id, &tracer, &ctx, "", || { - SEGMENT_ENGINE.with(|engine| { - let runtime = match engine.try_borrow_mut() { - Ok(runtime) => runtime, - Err(e) => { - return tarantool_error(&format!( - "Failed to borrow the runtime while executing the query: {e}" - )); - } - }; - match runtime.execute_plan(&mut required, &mut raw_optional) { - Ok(result) => { - if let Some(tuple) = (*result).downcast_ref::<Tuple>() { - f_ctx.return_tuple(tuple).unwrap(); - 0 - } else if let Some(mp) = (*result).downcast_ref::<Vec<u8>>() { - f_ctx.return_mp(mp.as_slice()).unwrap(); - 0 - } else { - error!( - Option::from("execute"), - &format!("Failed to downcast result: {result:?}") - ); - tarantool_error("Unsupported result type") - } - } - Err(e) => { - let error = format!("Failed to execute the query: {e}"); - error!(Option::from("execute"), &error); - tarantool_error(&error) + SEGMENT_ENGINE.with(|engine| { + let runtime = match engine.try_borrow_mut() { + Ok(runtime) => runtime, + Err(e) => { + return tarantool_error(&format!( + "Failed to borrow the runtime while executing the query: {e}" + )); + } + }; + match runtime.execute_plan(&mut required, &mut raw_optional) { + Ok(result) => { + if let Some(tuple) = (*result).downcast_ref::<Tuple>() { + f_ctx.return_tuple(tuple).unwrap(); + 0 + } else if let Some(mp) = (*result).downcast_ref::<Vec<u8>>() { + f_ctx.return_mp(mp.as_slice()).unwrap(); + 0 + } else { + error!( + Option::from("execute"), + &format!("Failed to downcast result: {result:?}") + ); + tarantool_error("Unsupported result type") } } - }) + Err(e) => { + let error = format!("Failed to execute the query: {e}"); + error!(Option::from("execute"), &error); + tarantool_error(&error) + } + } }) } diff --git a/sbroad-cartridge/src/api/statistics.rs b/sbroad-cartridge/src/api/statistics.rs deleted file mode 100644 index 0148c61689..0000000000 --- a/sbroad-cartridge/src/api/statistics.rs +++ /dev/null @@ -1,21 +0,0 @@ -use sbroad::debug; -use sbroad::otm::statistics::table::{QUERY, SPAN, STAT}; -use std::os::raw::c_int; - -/// Initialize query statistics tables. -/// -/// Though the function always returns a success, it can fail to create -/// the table (for example on read-only replica). In this case, the -/// warning will be logged, but the function will return success. -#[allow(clippy::module_name_repetitions)] -#[no_mangle] -pub extern "C" fn init_statistics() -> c_int { - debug!( - Option::from("init_statistics"), - "Initializing statistics tables" - ); - QUERY.with(|_| {}); - SPAN.with(|_| {}); - STAT.with(|_| {}); - 0 -} diff --git a/sbroad-cartridge/src/cartridge.rs b/sbroad-cartridge/src/cartridge.rs index 9fe1301ed8..b2049952e1 100644 --- a/sbroad-cartridge/src/cartridge.rs +++ b/sbroad-cartridge/src/cartridge.rs @@ -6,38 +6,10 @@ pub mod storage; use std::cell::Ref; -use opentelemetry::global::{set_text_map_propagator, set_tracer_provider}; -use opentelemetry::sdk::propagation::{TextMapCompositePropagator, TraceContextPropagator}; use sbroad::error; -use sbroad::errors::{Action, SbroadError}; -use sbroad::otm::update_global_tracer; +use sbroad::errors::SbroadError; use tarantool::tlua::LuaFunction; -static SERVICE_NAME: &str = "sbroad"; - -/// Update the opentelemetry global trace provider and tracer. -/// -/// # Errors -/// - failed to build OTM global trace provider -pub fn update_tracing(host: &str, port: u16) -> Result<(), SbroadError> { - let propagator = TextMapCompositePropagator::new(vec![Box::new(TraceContextPropagator::new())]); - set_text_map_propagator(propagator); - let provider = opentelemetry_jaeger::new_pipeline() - .with_agent_endpoint(format!("{host}:{port}")) - .with_service_name(SERVICE_NAME) - .build_simple() - .map_err(|e| { - SbroadError::FailedTo( - Action::Build, - None, - format!("OTM global trace provider: {e}"), - ) - })?; - set_tracer_provider(provider); - update_global_tracer(); - Ok(()) -} - /// Cartridge cluster configuration provider. pub trait ConfigurationProvider: Sized { type Configuration; diff --git a/sbroad-cartridge/src/cartridge/config.rs b/sbroad-cartridge/src/cartridge/config.rs index 960f9026cf..fe2c8f5f9a 100644 --- a/sbroad-cartridge/src/cartridge/config.rs +++ b/sbroad-cartridge/src/cartridge/config.rs @@ -28,12 +28,6 @@ pub struct RouterConfiguration { /// Sharding column names. sharding_column: String, - /// Jaeger agent host. - jaeger_agent_host: String, - - /// Jaeger agent port. - jaeger_agent_port: u16, - /// IR table segments from the cluster spaces tables: HashMap<String, Table>, @@ -53,8 +47,6 @@ impl RouterConfiguration { RouterConfiguration { waiting_timeout: 360, cache_capacity: DEFAULT_CAPACITY, - jaeger_agent_host: "localhost".to_string(), - jaeger_agent_port: 6831, tables: HashMap::new(), sharding_column: String::new(), functions: HashMap::new(), @@ -263,24 +255,6 @@ impl RouterConfiguration { Ok(()) } - #[must_use] - pub fn get_jaeger_agent_host(&self) -> &str { - self.jaeger_agent_host.as_str() - } - - #[must_use] - pub fn get_jaeger_agent_port(&self) -> u16 { - self.jaeger_agent_port - } - - pub fn set_jaeger_agent_host(&mut self, host: String) { - self.jaeger_agent_host = host; - } - - pub fn set_jaeger_agent_port(&mut self, port: u16) { - self.jaeger_agent_port = port; - } - /// Setup response waiting timeout for executor pub fn set_waiting_timeout(&mut self, timeout: u64) { if timeout > 0 { @@ -351,10 +325,6 @@ pub struct StorageConfiguration { /// If a new statement is bigger doesn't fit into the cache, /// it would not be cached but executed directly. pub storage_size_bytes: usize, - /// Jaeger agent host - pub jaeger_agent_host: String, - /// Jaeger agent port - pub jaeger_agent_port: u16, } impl Default for StorageConfiguration { @@ -369,8 +339,6 @@ impl StorageConfiguration { StorageConfiguration { storage_capacity: 0, storage_size_bytes: 0, - jaeger_agent_host: "localhost".to_string(), - jaeger_agent_port: 6831, } } diff --git a/sbroad-cartridge/src/cartridge/router.rs b/sbroad-cartridge/src/cartridge/router.rs index 5600d58f32..82e0bf8e45 100644 --- a/sbroad-cartridge/src/cartridge/router.rs +++ b/sbroad-cartridge/src/cartridge/router.rs @@ -15,8 +15,8 @@ use std::rc::Rc; use sbroad::cbo::histogram::Scalar; use tarantool::tlua::LuaFunction; +use crate::cartridge::bucket_count; use crate::cartridge::config::RouterConfiguration; -use crate::cartridge::{bucket_count, update_tracing}; use sbroad::executor::protocol::Binary; use sbroad::error; @@ -88,26 +88,6 @@ impl ConfigurationProvider for RouterRuntime { } }; - let jaeger_agent_host: LuaFunction<_> = - lua.eval("return get_jaeger_agent_host;").unwrap(); - let jaeger_host: String = match jaeger_agent_host.call() { - Ok(res) => res, - Err(e) => { - error!(Option::from("getting jaeger agent host"), &format!("{e:?}"),); - return Err(SbroadError::LuaError(format!("{e:?}"))); - } - }; - - let jaeger_agent_port: LuaFunction<_> = - lua.eval("return get_jaeger_agent_port;").unwrap(); - let jaeger_port: u16 = match jaeger_agent_port.call() { - Ok(res) => res, - Err(e) => { - error!(Option::from("getting jaeger agent port"), &format!("{e:?}"),); - return Err(SbroadError::LuaError(format!("{e:?}"))); - } - }; - let waiting_timeout: LuaFunction<_> = lua.eval("return get_waiting_timeout;").unwrap(); let timeout: u64 = match waiting_timeout.call() { Ok(res) => res, @@ -148,17 +128,11 @@ impl ConfigurationProvider for RouterRuntime { }; let mut metadata = RouterConfiguration::new(); - metadata.set_jaeger_agent_host(jaeger_host); - metadata.set_jaeger_agent_port(jaeger_port); metadata.set_waiting_timeout(timeout); metadata.set_cache_capacity(router_capacity); metadata.set_sharding_column(normalize_name_from_schema(column.as_str())); // We should always load the schema **after** setting the sharding column. metadata.load_schema(&schema)?; - update_tracing( - metadata.get_jaeger_agent_host(), - metadata.get_jaeger_agent_port(), - )?; return Ok(Some(metadata)); } diff --git a/sbroad-cartridge/src/cartridge/storage.rs b/sbroad-cartridge/src/cartridge/storage.rs index b78942f9d5..f0437e4096 100644 --- a/sbroad-cartridge/src/cartridge/storage.rs +++ b/sbroad-cartridge/src/cartridge/storage.rs @@ -1,5 +1,5 @@ +use crate::cartridge::bucket_count; use crate::cartridge::config::StorageConfiguration; -use crate::cartridge::{bucket_count, update_tracing}; use sbroad::errors::{Action, Entity, SbroadError}; use sbroad::executor::bucket::Buckets; use sbroad::executor::engine::helpers::storage::runtime::unprepare; @@ -143,32 +143,9 @@ impl ConfigurationProvider for StorageRuntime { let storage_size_bytes = usize::try_from(cache_size_bytes) .map_err(|e| SbroadError::Invalid(Entity::Cache, Some(format!("{e}"))))?; - let jaeger_agent_host: LuaFunction<_> = - lua.eval("return get_jaeger_agent_host;").unwrap(); - let jaeger_host: String = match jaeger_agent_host.call() { - Ok(res) => res, - Err(e) => { - error!(Option::from("getting jaeger agent host"), &format!("{e:?}"),); - return Err(SbroadError::LuaError(format!("{e:?}"))); - } - }; - - let jaeger_agent_port: LuaFunction<_> = - lua.eval("return get_jaeger_agent_port;").unwrap(); - let jaeger_port: u16 = match jaeger_agent_port.call() { - Ok(res) => res, - Err(e) => { - error!(Option::from("getting jaeger agent port"), &format!("{e:?}"),); - return Err(SbroadError::LuaError(format!("{e:?}"))); - } - }; - let mut metadata = StorageConfiguration::new(); metadata.storage_capacity = storage_capacity; metadata.storage_size_bytes = storage_size_bytes; - metadata.jaeger_agent_host = jaeger_host; - metadata.jaeger_agent_port = jaeger_port; - update_tracing(&metadata.jaeger_agent_host, metadata.jaeger_agent_port)?; return Ok(Some(metadata)); } diff --git a/sbroad-cartridge/src/init.lua b/sbroad-cartridge/src/init.lua index 6aa48ec816..dc6b80e789 100644 --- a/sbroad-cartridge/src/init.lua +++ b/sbroad-cartridge/src/init.lua @@ -1,4 +1,3 @@ -local cartridge = require('cartridge') local checks = require('checks') local core = require('sbroad.core') @@ -36,32 +35,8 @@ local function calculate_bucket_id(values, space_name) -- luacheck: no unused ar return result end -local function init_statistics () - box.func["libsbroad.init_statistics"]:call({}) -end - -_G.get_jaeger_agent_host = function() - local cfg = cartridge.config_get_readonly() - - if cfg["jaeger_agent_host"] == nil then - return "localhost" - end - - return cfg["jaeger_agent_host"] -end - -_G.get_jaeger_agent_port = function() - local cfg = cartridge.config_get_readonly() - - if cfg["jaeger_agent_port"] == nil then - return 6831 - end - - return cfg["jaeger_agent_port"] -end return { init = init, - init_statistics = init_statistics, calculate_bucket_id = calculate_bucket_id, } diff --git a/sbroad-cartridge/src/router.lua b/sbroad-cartridge/src/router.lua index 17b791d3c1..5458e61e3e 100644 --- a/sbroad-cartridge/src/router.lua +++ b/sbroad-cartridge/src/router.lua @@ -57,27 +57,13 @@ local function invalidate_cache () box.func["libsbroad.invalidate_coordinator_cache"]:call({}) end -local function trace(query, params, context, id) - local has_err, parser_res = pcall( - function() - return box.func["libsbroad.dispatch_query"]:call({ - query, params, context, id, helper.constants.GLOBAL_TRACER }) - end - ) - - if has_err == false then - return nil, parser_res - end - - return helper.format_result(parser_res[1]) -end local function execute(query, params) local has_err, parser_res = pcall( function() return box.func["libsbroad.dispatch_query"]:call({ - query, params, box.NULL, box.NULL, - helper.constants.STAT_TRACER }) + query, params, box.NULL, box.NULL, helper.constants.STAT_TRACER, + }) end ) @@ -92,5 +78,4 @@ return { init=init, invalidate_cache = invalidate_cache, execute = execute, - trace = trace, } diff --git a/sbroad-cartridge/test_app/app/roles/api.lua b/sbroad-cartridge/test_app/app/roles/api.lua index 67f21bbca7..96645796c5 100644 --- a/sbroad-cartridge/test_app/app/roles/api.lua +++ b/sbroad-cartridge/test_app/app/roles/api.lua @@ -1,7 +1,6 @@ local cartridge = require('cartridge') local yaml = require("yaml") local checks = require('checks') -local helper = require('sbroad.helper') _G.set_schema = nil @@ -20,25 +19,8 @@ local function set_schema(new_schema) return nil end -local function const_trace(query, params, context, id) - local has_err, parser_res = pcall( - function() - return box.func["libsbroad.dispatch_query"]:call({ - query, params, context, id, - helper.constants.TEST_TRACER }) - end - ) - - if has_err == false then - return nil, parser_res - end - - return helper.format_result(parser_res[1]) -end - local function init(opts) -- luacheck: no unused args _G.set_schema = set_schema - _G.sbroad.const_trace = const_trace return true end diff --git a/sbroad-cartridge/test_app/test/integration/api_test.lua b/sbroad-cartridge/test_app/test/integration/api_test.lua index af12424efb..c642e1ef94 100644 --- a/sbroad-cartridge/test_app/test/integration/api_test.lua +++ b/sbroad-cartridge/test_app/test/integration/api_test.lua @@ -483,43 +483,3 @@ g.test_uppercase3 = function() rows = {}, }) end - -g.test_trace1 = function() - local api = cluster:server("api-1").net_box - - -- local trace_id = 42 - -- local parent_id = 666 - -- -- 0 - not sampled, 1 - sampled - -- local flags = 0 - - -- -- W3C format - -- local supported_version = 0 - -- local traceparent = string.format("%02x-%032x-%016x-%02x", - -- supported_version, trace_id, parent_id, flags) - -- local carrier = { ["traceparent"] = traceparent, ["tracestate"] = "foo=bar" } - - -- -- Jaegger format - -- local depricated_parent_span = 0 - -- local value = string.format("%032x:%016x:%01x:%01x", trace_id, parent_id, depricated_parent_span, flags) - -- local carrier = { ["uber-trace-id"] = value, ["uberctx-key1"] = "value1" } - - -- No external context, only query id - local carrier = box.NULL - - api:call("box.cfg", {{log_level = 7}}) - local r, err = api:call("sbroad.trace", { [[ - SELECT "id" FROM "BROKEN" WHERE "id" = ? - ]], {1}, carrier, "id1" }) - - t.assert_equals(err, nil) - t.assert_equals(r, { - metadata = { - {name = "id", type = "number"}, - }, - rows = {}, - }) - api:call("box.cfg", {{log_level = 5}}) - - local pattern = 'tracer: Global' - t.assert_equals(helper.grep_log(pattern), pattern) -end diff --git a/sbroad-cartridge/test_app/test/integration/insert_test.lua b/sbroad-cartridge/test_app/test/integration/insert_test.lua index e0d080ca55..ccde31e0d6 100644 --- a/sbroad-cartridge/test_app/test/integration/insert_test.lua +++ b/sbroad-cartridge/test_app/test/integration/insert_test.lua @@ -507,82 +507,6 @@ g.test_insert_on_conflict_do_replace_fails_for_secondary_unique_index = function "TupleFound: Duplicate key exists in unique index \\\\\\\"secondary\\\\\\\"") end -local function assert_cache_hit(query_id) - local storage1 = cluster:server("storage-1-1").net_box - local r, err = storage1:call("box.execute", { [[ - select "span", "query_id" from "_sql_stat" - where "span" = '"tarantool.cache.hit.read.prepared"' and "query_id" = ? ]], { query_id } }) - t.assert_equals(err, nil) - t.assert_equals(r.metadata, { - { name = "span", type = "string" }, - { name = "query_id", type = "string" }, - }) - t.assert_equals(#r.rows, 1) -end - -local function assert_cache_miss(query_id) - local storage1 = cluster:server("storage-1-1").net_box - local r, err = storage1:call("box.execute", { [[ - select "span", "query_id" from "_sql_stat" - where "span" = '"tarantool.cache.miss.read.prepared"' and "query_id" = ? - ]] , { query_id }}) - t.assert_equals(err, nil) - t.assert_equals(r.metadata, { - { name = "span", type = "string" }, - { name = "query_id", type = "string" }, - }) - t.assert_equals(#r.rows, 1) -end - -g.test_cache_works_insert = function() - local api = cluster:server("api-1").net_box - - local query_id, query = "id", [[ - INSERT INTO "space_simple_shard_key" - SELECT "id"+"id", "name" || "name", "sysOp" + "sysOp" - FROM "space_simple_shard_key" - ON CONFLICT DO REPLACE - ]] - local params = { query, {}, nil, query_id } - - local r, err = api:call("sbroad.const_trace", params) - t.assert_equals(err, nil) - t.assert_not_equals(r, {}) - assert_cache_miss(query_id) - - r, err = api:call("sbroad.const_trace", params) - t.assert_equals(err, nil) - t.assert_not_equals(r, {}) - assert_cache_hit(query_id) -end - -g.test_only_executed_part_is_cached = function() - local api = cluster:server("api-1").net_box - - -- test only select part of the insert is being cached - local select_part = [[ - SELECT "id"*"id", "name" || "name", "sysOp" + "sysOp" - FROM "space_simple_shard_key" - ]] - local query_id, query = "id1", string.format([[ - INSERT INTO "space_simple_shard_key" - %s - ON CONFLICT DO REPLACE - ]], select_part) - local params = { query, {}, nil, query_id } - - local r, err = api:call("sbroad.const_trace", params) - t.assert_equals(err, nil) - t.assert_not_equals(r, {}) - assert_cache_miss(query_id) - - params[1] = select_part - r, err = api:call("sbroad.const_trace", params) - t.assert_equals(err, nil) - t.assert_not_equals(r, {}) - assert_cache_hit(query_id) -end - g.test_double_conversion = function() local api = cluster:server("api-1").net_box local query_id, query = "id1", [[ diff --git a/sbroad-core/Cargo.toml b/sbroad-core/Cargo.toml index a9e2988d6a..3f19f6bc70 100644 --- a/sbroad-core/Cargo.toml +++ b/sbroad-core/Cargo.toml @@ -16,9 +16,7 @@ bincode = "1.3" blake3 = "1.3" hash32 = "0.2" itertools = "0.10" -lazy_static = "1.4" opentelemetry = "0.17" -opentelemetry-jaeger = "0.16" pest = "2.0" pest_derive = "2.0" rand = "0.8" @@ -38,3 +36,4 @@ crate-type = ["cdylib", "rlib"] [features] mock = [] +tracing = [] diff --git a/sbroad-core/src/backend/sql/ir.rs b/sbroad-core/src/backend/sql/ir.rs index 9c9adc24e5..022374d1ca 100644 --- a/sbroad-core/src/backend/sql/ir.rs +++ b/sbroad-core/src/backend/sql/ir.rs @@ -15,10 +15,7 @@ use crate::ir::expression::Expression; use crate::ir::operator::Relational; use crate::ir::value::{LuaValue, Value}; use crate::ir::Node; -use crate::otm::{ - child_span, current_id, current_tracer, deserialize_context, inject_context, query_id, - QueryTracer, -}; +use crate::otm::{child_span, current_id, deserialize_context, inject_context, query_id}; use super::space::TmpSpace; use super::tree::SyntaxData; @@ -29,7 +26,8 @@ pub struct PatternWithParams { pub params: Vec<Value>, pub context: Option<HashMap<String, String>>, pub id: Option<String>, - pub tracer: QueryTracer, + // Name of the tracer to use + pub tracer: Option<String>, } impl PartialEq for PatternWithParams { @@ -62,7 +60,7 @@ pub struct EncodedPatternWithParams( Option<Vec<LuaValue>>, Option<HashMap<String, String>>, Option<String>, - String, + Option<String>, ); impl From<PatternWithParams> for EncodedPatternWithParams { @@ -73,7 +71,7 @@ impl From<PatternWithParams> for EncodedPatternWithParams { Some(encoded_params), value.context, value.id, - value.tracer.to_string(), + value.tracer, ) } } @@ -93,7 +91,7 @@ impl TryFrom<EncodedPatternWithParams> for PatternWithParams { params, context: value.2, id: value.3, - tracer: value.4.try_into()?, + tracer: value.4, }; Ok(res) } @@ -104,22 +102,21 @@ impl PatternWithParams { pub fn new(pattern: String, params: Vec<Value>) -> Self { let mut carrier = HashMap::new(); inject_context(&mut carrier); - let tracer = current_tracer(); if carrier.is_empty() { PatternWithParams { pattern, params, context: None, id: None, - tracer, + tracer: None, } } else { PatternWithParams { pattern, params, context: Some(carrier), - id: Some(current_id()), - tracer, + id: current_id(), + tracer: None, } } } diff --git a/sbroad-core/src/core.lua b/sbroad-core/src/core.lua index 447a97245d..2c7aec951a 100644 --- a/sbroad-core/src/core.lua +++ b/sbroad-core/src/core.lua @@ -8,17 +8,8 @@ local function init_bucket() ) end -local function init_statistics() - local exec_fn = helper.module_name() .. ".init_statistics" - box.schema.func.create( - exec_fn, - { if_not_exists = true, language = 'C' } - ) -end - local function init() init_bucket() - init_statistics() end return { diff --git a/sbroad-core/src/executor.rs b/sbroad-core/src/executor.rs index 2d5379a484..7e27a6c5af 100644 --- a/sbroad-core/src/executor.rs +++ b/sbroad-core/src/executor.rs @@ -109,7 +109,6 @@ where /// - Failed to build AST. /// - Failed to build IR plan. /// - Failed to apply optimizing transformations to IR plan. - #[otm_child_span("query.new")] pub fn new(coordinator: &'a C, sql: &str, params: Vec<Value>) -> Result<Self, SbroadError> where C::Cache: Cache<String, Plan>, diff --git a/sbroad-core/src/executor/protocol.rs b/sbroad-core/src/executor/protocol.rs index da6ba3e581..5193651cde 100644 --- a/sbroad-core/src/executor/protocol.rs +++ b/sbroad-core/src/executor/protocol.rs @@ -8,7 +8,7 @@ use crate::debug; use crate::errors::{Action, Entity, SbroadError}; use crate::executor::ir::{ExecutionPlan, QueryType}; use crate::ir::value::Value; -use crate::otm::{current_id, current_tracer, extract_context, inject_context, QueryTracer}; +use crate::otm::{current_id, extract_context, inject_context}; use crate::executor::engine::TableVersionMap; use crate::ir::Options; @@ -70,6 +70,22 @@ impl SchemaInfo { } } +/// Helper struct for storing tracing related information +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +pub struct TracingMetadata { + /// Context passed between nodes + pub context: ContextCarrier, + /// Id of a trace + pub trace_id: String, +} + +impl TracingMetadata { + #[must_use] + pub fn new(context: ContextCarrier, trace_id: String) -> Self { + Self { context, trace_id } + } +} + #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub struct RequiredData { // Unique ID for concrete plan represented in a view of BLAKE3 hash. @@ -79,11 +95,9 @@ pub struct RequiredData { pub parameters: Vec<Value>, pub query_type: QueryType, pub can_be_cached: bool, - context: ContextCarrier, - tracer: QueryTracer, - trace_id: String, pub options: Options, pub schema_info: SchemaInfo, + pub tracing_meta: Option<TracingMetadata>, } impl Default for RequiredData { @@ -93,11 +107,9 @@ impl Default for RequiredData { parameters: vec![], query_type: QueryType::DQL, can_be_cached: true, - context: ContextCarrier::empty(), - tracer: QueryTracer::default(), - trace_id: String::new(), options: Options::default(), schema_info: SchemaInfo::default(), + tracing_meta: None, } } } @@ -140,34 +152,21 @@ impl RequiredData { options: Options, schema_info: SchemaInfo, ) -> Self { - let mut carrier = HashMap::new(); - inject_context(&mut carrier); - let tracer = current_tracer(); - let trace_id = current_id(); - if carrier.is_empty() { - RequiredData { - plan_id, - parameters, - query_type, - can_be_cached, - context: ContextCarrier::empty(), - tracer, - trace_id, - options, - schema_info, - } - } else { - RequiredData { - plan_id, - parameters, - query_type, - can_be_cached, - context: ContextCarrier::new(carrier), - tracer, - trace_id, - options, - schema_info, - } + let mut tracing_meta = None; + if let Some(trace_id) = current_id() { + let mut carrier = HashMap::new(); + inject_context(&mut carrier); + tracing_meta = Some(TracingMetadata::new(ContextCarrier::new(carrier), trace_id)); + } + + RequiredData { + plan_id, + parameters, + query_type, + can_be_cached, + options, + schema_info, + tracing_meta, } } @@ -175,20 +174,6 @@ impl RequiredData { pub fn plan_id(&self) -> &str { &self.plan_id } - - #[must_use] - pub fn tracer(&self) -> QueryTracer { - self.tracer.clone() - } - - #[must_use] - pub fn trace_id(&self) -> &str { - &self.trace_id - } - - pub fn extract_context(&mut self) -> Context { - (&mut self.context).into() - } } pub struct EncodedRequiredData(Vec<u8>); diff --git a/sbroad-core/src/lib.rs b/sbroad-core/src/lib.rs index f2cf398854..d41b552643 100644 --- a/sbroad-core/src/lib.rs +++ b/sbroad-core/src/lib.rs @@ -1,7 +1,4 @@ //! Tarantool planner and executor for a distributed SQL. -#[macro_use] -extern crate lazy_static; - #[macro_use] extern crate pest_derive; extern crate core; diff --git a/sbroad-core/src/otm.rs b/sbroad-core/src/otm.rs index 6616ae341a..c0df93b0ed 100644 --- a/sbroad-core/src/otm.rs +++ b/sbroad-core/src/otm.rs @@ -10,43 +10,36 @@ use crate::debug; use ahash::AHashMap; use base64ct::{Base64, Encoding}; -use opentelemetry::global::{get_text_map_propagator, tracer, BoxedTracer}; +use opentelemetry::propagation::TextMapPropagator; use opentelemetry::propagation::{Extractor, Injector}; -use opentelemetry::sdk::trace::{ - Config as SdkConfig, Sampler as SdkSampler, Tracer as SdkTracer, - TracerProvider as SdkTracerProvider, -}; -use opentelemetry::trace::TracerProvider; +use opentelemetry::sdk::propagation::BaggagePropagator; +use opentelemetry::sdk::trace::Span; #[allow(unused_imports)] use opentelemetry::trace::{SpanBuilder, SpanKind, TraceContextExt, Tracer}; #[allow(unused_imports)] use opentelemetry::{Context, KeyValue}; -use serde::{Deserialize, Serialize}; use std::cell::RefCell; use std::collections::HashMap; -use std::fmt::{Debug, Display, Formatter}; -use tarantool::tlua::{self, Push}; +use std::fmt::{Debug, Formatter}; pub mod fiber; -pub mod statistics; -#[cfg(not(feature = "mock"))] -mod prod_imports { +#[cfg(all(feature = "tracing", not(feature = "mock")))] +mod tracing_imports { + pub use crate::error; pub use crate::otm::fiber::fiber_id; pub use crate::warn; + pub use opentelemetry::baggage::BaggageExt; pub use tarantool::error::Error as TntError; } -use crate::errors::{Entity, SbroadError}; -#[cfg(not(feature = "mock"))] -use prod_imports::*; +#[cfg(all(feature = "tracing", not(feature = "mock")))] +use tracing_imports::*; pub const OTM_CHAR_LIMIT: usize = 512; -static TRACER_NAME: &str = "libsbroad"; -static RATIO: f64 = 0.01; thread_local!( /// Thread-local storage for the trace information of the current fiber. /// @@ -58,70 +51,29 @@ thread_local!( /// be sure that transaction commit is called after the RefCell is released. static TRACE_MANAGER: RefCell<TraceManager> = RefCell::new(TraceManager::new()) ); -thread_local!(static GLOBAL_TRACER: RefCell<BoxedTracer> = RefCell::new(tracer(TRACER_NAME))); -lazy_static! { - static ref STATISTICS_PROVIDER: SdkTracerProvider = SdkTracerProvider::builder() - .with_span_processor(statistics::StatCollector::new()) - .with_config(SdkConfig::default().with_sampler(SdkSampler::TraceIdRatioBased(RATIO))) - .build(); - #[derive(Debug)] - static ref STATISTICS_TRACER: SdkTracer = STATISTICS_PROVIDER.versioned_tracer("stat", None, None); - /// Like statistic tracer but always create traces. Used only for testing purposes. - static ref TEST_STATISTICS_PROVIDER: SdkTracerProvider = SdkTracerProvider::builder() - .with_span_processor(statistics::StatCollector::new()) - .build(); - static ref TEST_STATISTICS_TRACER: SdkTracer = TEST_STATISTICS_PROVIDER.versioned_tracer("test_stat", None, None); -} -#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize, Push)] -#[allow(dead_code)] -#[derive(Default)] -pub enum QueryTracer { - /// Sends all metrics to Jaeger agent - Global, - /// Gathers stats about spans and saves them to temporary spaces - /// on each node, but does it only for 1% of the queries. - #[default] - Statistics, - /// Like STAT_TRACER but saves stats for each query. - /// It is used only for tests. - TestStatistics, -} +pub trait QueryTracer { + type Span: opentelemetry::trace::Span; -impl Display for QueryTracer { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let s = match self { - QueryTracer::Global => "global", - QueryTracer::Statistics => "stat", - QueryTracer::TestStatistics => "test_stat", - }; - write!(f, "{s}") - } + fn build_with_context(&self, builder: SpanBuilder, parent_cx: &Context) -> Self::Span; } -impl TryFrom<String> for QueryTracer { - type Error = SbroadError; - - fn try_from(value: String) -> Result<Self, Self::Error> { - let normalized = value.to_lowercase(); - let res = match normalized.as_str() { - "global" => QueryTracer::Global, - "stat" => QueryTracer::Statistics, - "test_stat" => QueryTracer::TestStatistics, - _ => { - return Err(SbroadError::Invalid( - Entity::PatternWithParams, - Some(format!("unknown tracer: {value}")), - )) - } - }; - Ok(res) +impl<T> QueryTracer for T +where + T: Tracer, +{ + type Span = T::Span; + + fn build_with_context(&self, builder: SpanBuilder, parent_cx: &Context) -> Self::Span { + self.build_with_context(builder, parent_cx) } } +pub type TracerRef = &'static dyn QueryTracer<Span = Span>; + #[allow(dead_code)] struct TraceInfo { - tracer: QueryTracer, + tracer: TracerRef, context: Context, id: String, } @@ -129,7 +81,6 @@ struct TraceInfo { impl Debug for TraceInfo { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("TraceInfo") - .field("tracer", &self.tracer) // Context `Debug` trait doesn't print anything useful. .field("context", &self.context.span()) .field("id", &self.id) @@ -139,7 +90,7 @@ impl Debug for TraceInfo { #[allow(dead_code)] impl TraceInfo { - fn new(tracer: QueryTracer, context: Context, id: String) -> Self { + fn new(tracer: TracerRef, context: Context, id: String) -> Self { Self { tracer, context, @@ -147,13 +98,8 @@ impl TraceInfo { } } - fn empty() -> Self { - // TODO: handle disable statistics. - Self::new(QueryTracer::Statistics, Context::new(), String::new()) - } - - fn tracer(&self) -> &QueryTracer { - &self.tracer + fn tracer(&self) -> TracerRef { + self.tracer } fn context(&self) -> &Context { @@ -189,24 +135,14 @@ impl TraceManager { } } +// We need this function as statistics tracer doesn't implement `ObjectSafeTracer` trait. #[inline] #[allow(dead_code)] -// We need this function as statistics tracer doesn't implement `ObjectSafeTracer` trait. -fn build_ctx(tracer: &QueryTracer, sb: SpanBuilder, ctx: &Context) -> Context { - match tracer { - QueryTracer::Statistics => { - let span = STATISTICS_TRACER.build_with_context(sb, ctx); - ctx.with_span(span) - } - QueryTracer::TestStatistics => { - let span = TEST_STATISTICS_TRACER.build_with_context(sb, ctx); - ctx.with_span(span) - } - QueryTracer::Global => { - let span = GLOBAL_TRACER.with(|tracer| tracer.borrow().build_with_context(sb, ctx)); - ctx.with_span(span) - } - } +fn build_ctx(tracer: TracerRef, sb: SpanBuilder, ctx: &Context) -> Context +where +{ + let span = tracer.build_with_context(sb, ctx); + ctx.with_span(span) } #[inline] @@ -216,15 +152,34 @@ pub fn child_span<T, F>(name: &'static str, f: F) -> T where F: FnOnce() -> T, { - #[cfg(not(feature = "mock"))] + #[cfg(all(feature = "tracing", not(feature = "mock")))] { let fid = fiber_id(); - let id = current_id(); - let old_ti = TRACE_MANAGER.with(|tm| { + + let Some(id) = current_id() else { + error!( + Option::from("child span"), + &format!( + "fiber {}, child span {}: missing trace info", + fid, name + ), + ); + return f() + }; + + let Some(old_ti) = TRACE_MANAGER.with(|tm| { tm.borrow_mut() .remove(fid) - .map_or(TraceInfo::empty(), |ti| ti) - }); + }) else { + error!( + Option::from("child span"), + &format!( + "fiber {}, child span {}: missing trace info", + fid, name + ), + ); + return f() + }; let ctx = build_ctx( old_ti.tracer(), SpanBuilder::from_name(name) @@ -232,46 +187,29 @@ where .with_attributes(vec![KeyValue::new("id", id.clone())]), old_ti.context(), ); - let ti = TraceInfo::new(old_ti.tracer().clone(), ctx, id); - TRACE_MANAGER.with(|tm| match tm.try_borrow_mut() { - Ok(mut mut_tm) => { - debug!( - Option::from("child span"), - &format!( - "fiber {}, child span {}: insert trace info {:?}", - fid, name, ti - ), - ); - mut_tm.insert(fid, ti); - } - Err(_e) => { - warn!( - Option::from("query span"), - &format!( - "fiber {}, child span {}: failed to insert trace info {:?}, error: {:?}", - fid, name, ti, _e - ), - ); - } + let ti = TraceInfo::new(old_ti.tracer(), ctx, id); + TRACE_MANAGER.with(|tm| { + let mut mut_tm = tm.borrow_mut(); + debug!( + Option::from("child span"), + &format!( + "fiber {}, child span {}: insert trace info {:?}", + fid, name, ti + ), + ); + mut_tm.insert(fid, ti); }); let result = f(); - TRACE_MANAGER.with(|tm| match tm.try_borrow_mut() { - Ok(mut mut_tm) => { - debug!( - Option::from("child span"), - &format!("fiber {}, child span {}: restore old trace info {:?}", fid, name, old_ti), - ); - mut_tm.insert(fid, old_ti); - } - Err(_e) => { - warn!( - Option::from("query span"), - &format!( - "fiber {}, child span {}: failed to restore old trace info {:?}, error: {:?}", - fid, name, old_ti, _e - ), - ); - } + TRACE_MANAGER.with(|tm| { + let mut mut_tm = tm.borrow_mut(); + debug!( + Option::from("child span"), + &format!( + "fiber {}, child span {}: restore old trace info {:?}", + fid, name, old_ti + ), + ); + mut_tm.insert(fid, old_ti); }); return result; } @@ -281,19 +219,15 @@ where #[inline] #[allow(unreachable_code)] #[allow(unused_variables)] -pub fn stat_query_span<T, F>(name: &'static str, sql: &str, id: &str, traceable: bool, f: F) -> T +#[allow(clippy::needless_pass_by_value)] +pub fn stat_query_span<T, F>(name: &'static str, sql: &str, id: &str, tracer: TracerRef, f: F) -> T where F: FnOnce() -> T, { - #[cfg(not(feature = "mock"))] + #[cfg(all(feature = "tracing", not(feature = "mock")))] { - let tracer = if traceable { - QueryTracer::TestStatistics - } else { - QueryTracer::Statistics - }; let ctx = Context::new(); - return query_span(name, &id, &tracer, &ctx, sql, f); + return query_span(name, &id, tracer, &ctx, sql, f); } f() } @@ -301,10 +235,11 @@ where #[inline] #[allow(unreachable_code)] #[allow(unused_variables)] +#[allow(clippy::needless_pass_by_value)] pub fn query_span<T, F>( name: &'static str, id: &str, - tracer: &QueryTracer, + tracer: TracerRef, ctx: &Context, sql: &str, f: F, @@ -312,7 +247,7 @@ pub fn query_span<T, F>( where F: FnOnce() -> T, { - #[cfg(not(feature = "mock"))] + #[cfg(all(feature = "tracing", not(feature = "mock")))] { let mut attributes: Vec<KeyValue> = Vec::new(); attributes.push(KeyValue::new("id", id.to_string())); @@ -326,53 +261,33 @@ where let fid = fiber_id(); let ctx = build_ctx( - &tracer, + tracer, SpanBuilder::from_name(name) .with_kind(SpanKind::Server) .with_attributes(attributes), ctx, ); - let ti = TraceInfo::new(tracer.clone(), ctx, id.to_string()); - - TRACE_MANAGER.with(|tm| match tm.try_borrow_mut() { - Ok(mut mut_tm) => { - debug!( - Option::from("query span"), - &format!( - "fiber {}, query span {}: insert trace info {:?}", - fid, name, ti - ), - ); - mut_tm.insert(fid, ti); - } - Err(_e) => { - warn!( - Option::from("query span"), - &format!( - "fiber {}, query span {}: failed to insert trace info {:?}, error: {:?}", - fid, name, ti, _e - ), - ); - } + let ti = TraceInfo::new(tracer, ctx, id.to_string()); + + TRACE_MANAGER.with(|tm| { + let mut mut_tm = tm.borrow_mut(); + debug!( + Option::from("query span"), + &format!( + "fiber {}, query span {}: insert trace info {:?}", + fid, name, ti + ), + ); + mut_tm.insert(fid, ti); }); let result = f(); - TRACE_MANAGER.with(|tm| match tm.try_borrow_mut() { - Ok(mut mut_tm) => { - debug!( - Option::from("query span"), - &format!("fiber {}, query span {}: remove trace info", fid, name), - ); - mut_tm.remove(fid); - } - Err(_e) => { - warn!( - Option::from("query span"), - &format!( - "fiber {}, query span {}: failed to remove trace info, error: {:?}", - fid, name, _e - ), - ); - } + TRACE_MANAGER.with(|tm| { + let mut mut_tm = tm.borrow_mut(); + debug!( + Option::from("query span"), + &format!("fiber {}, query span {}: remove trace info", fid, name), + ); + mut_tm.remove(fid); }); return result; } @@ -381,23 +296,13 @@ where #[must_use] #[allow(unreachable_code)] -pub fn current_id() -> String { - #[cfg(not(feature = "mock"))] +pub fn current_id() -> Option<String> { + #[cfg(all(feature = "tracing", not(feature = "mock")))] { let fid = fiber_id(); - return TRACE_MANAGER.with(|tm| { - tm.borrow() - .get(fid) - .map_or_else(new_id, |ti| ti.id().to_string()) - }); + return TRACE_MANAGER.with(|tm| tm.borrow().get(fid).map(|ti| ti.id().to_string())); } - new_id() -} - -#[inline] -#[must_use] -fn new_id() -> String { - uuid::Uuid::new_v4().to_string() + None } #[inline] @@ -406,28 +311,9 @@ pub fn query_id(pattern: &str) -> String { Base64::encode_string(blake3::hash(pattern.as_bytes()).to_hex().as_bytes()) } -pub fn update_global_tracer() { - GLOBAL_TRACER.with(|gt| *gt.borrow_mut() = tracer(TRACER_NAME)); -} - -#[must_use] -#[allow(unreachable_code)] -pub fn current_tracer() -> QueryTracer { - #[cfg(not(feature = "mock"))] - { - let fid = fiber_id(); - return TRACE_MANAGER.with(|tm| { - tm.borrow() - .get(fid) - .map_or(QueryTracer::default(), |ti| ti.tracer().clone()) - }); - } - QueryTracer::default() -} - #[allow(unused_variables)] pub fn inject_context(carrier: &mut dyn Injector) { - #[cfg(not(feature = "mock"))] + #[cfg(all(feature = "tracing", not(feature = "mock")))] { let fid = fiber_id(); TRACE_MANAGER.with(|tm| { @@ -439,9 +325,12 @@ pub fn inject_context(carrier: &mut dyn Injector) { ); }, |ti| { - get_text_map_propagator(|propagator| { - propagator.inject_context(ti.context(), carrier); - }); + debug!( + None, + &format!("cx: {:?}, {:?}", ti.context(), ti.context().baggage()) + ); + let propagator = BaggagePropagator::new(); + propagator.inject_context(ti.context(), carrier); }, ); }); @@ -451,9 +340,10 @@ pub fn inject_context(carrier: &mut dyn Injector) { #[allow(unreachable_code)] pub fn extract_context(carrier: &mut dyn Extractor) -> Context { let f = |ctx: &Context| -> Context { - get_text_map_propagator(|propagator| propagator.extract_with_context(ctx, carrier)) + let propagator = BaggagePropagator::new(); + propagator.extract_with_context(ctx, carrier) }; - #[cfg(not(feature = "mock"))] + #[cfg(all(feature = "tracing", not(feature = "mock")))] { let fid = fiber_id(); return TRACE_MANAGER.with(|tm| { @@ -489,12 +379,7 @@ pub fn extract_params<S: ::std::hash::BuildHasher>( context: Option<HashMap<String, String, S>>, id: Option<String>, pattern: &str, - force_trace: bool, -) -> (String, Context, QueryTracer) { - let tracer = match (force_trace, &id, &context) { - (_, None, None) | (false, _, _) => QueryTracer::Statistics, - _ => QueryTracer::Global, - }; +) -> (String, Context) { let id = id.unwrap_or_else(|| query_id(pattern)); let ctx = if let Some(mut carrier) = context { debug!( @@ -510,5 +395,5 @@ pub fn extract_params<S: ::std::hash::BuildHasher>( } else { Context::new() }; - (id, ctx, tracer) + (id, ctx) } diff --git a/sbroad-core/src/otm/fiber.rs b/sbroad-core/src/otm/fiber.rs index b379719576..c3116aebb9 100644 --- a/sbroad-core/src/otm/fiber.rs +++ b/sbroad-core/src/otm/fiber.rs @@ -1,4 +1,4 @@ -#[cfg(not(feature = "mock"))] +#[cfg(all(feature = "tracing", not(feature = "mock")))] pub fn fiber_id() -> u64 { let lua = tarantool::lua_state(); lua.eval("return require('fiber').id()").unwrap() diff --git a/sbroad-core/src/otm/statistics.rs b/sbroad-core/src/otm/statistics.rs deleted file mode 100644 index 51ff0e1b4c..0000000000 --- a/sbroad-core/src/otm/statistics.rs +++ /dev/null @@ -1,165 +0,0 @@ -//! Statistics span processor. -//! -//! This span processor is used to collect statistics about running queries. -//! It uses sampling (1% at the moment) to reduce the overhead of collecting -//! statistics. The results are written to `_sql_query` and `_sql_stat` -//! spaces and evicted by LRU strategy (more details in the `table` and -//! `eviction` modules). -//! -//! The best way to inspect the statistics on any instance is to use local SQL. -//! For example, to get the top 5 most expensive SELECT SQL queries by the -//! average execution time: -//! ```sql -//! select distinct(q."query_text") from "_sql_stat" as s -//! join "_sql_query" as q -//! on s."query_id" = q."query_id" -//! where lower(q."query_text") like 'select%' -//! order by s."sum"/s."count" desc -//! limit 5 -//! ``` -//! -//! Or to get the flame graph of the most expensive query: -//! ```sql -//! with recursive st as ( -//! select * from "_sql_stat" where "query_id" in (select qt."query_id" from qt) -//! and "parent_span" = '' -//! union all -//! select s.* from "_sql_stat" as s, st on s."parent_span" = st."span" -//! and s."query_id" in (select qt."query_id" from qt) -//! ), qt as ( -//! select s."query_id" from "_sql_stat" as s -//! join "_sql_query" as q -//! on s."query_id" = q."query_id" -//! order by s."sum"/s."count" desc -//! limit 1 -//! ) -//! select * from st; -//! ``` - -use eviction::TRACKED_QUERIES; -use opentelemetry::sdk::export::trace::SpanData; -use opentelemetry::sdk::trace::{Span, SpanProcessor}; -use opentelemetry::trace::SpanId; -use opentelemetry::Context; -use std::time::Duration; -use table::{RustMap, SpanName, TarantoolSpace, QUERY, SPAN, STAT}; - -use crate::debug; - -pub mod eviction; -pub mod table; - -#[derive(Debug)] -pub struct StatCollector {} - -impl StatCollector { - #[must_use] - pub fn new() -> Self { - Self {} - } -} - -impl Default for StatCollector { - fn default() -> Self { - Self::new() - } -} - -impl SpanProcessor for StatCollector { - fn on_start(&self, span: &mut Span, _cx: &Context) { - let Some(span_data) = span.exported_data() else { - return - }; - - let id: String = match span_data.attributes.get(&"id".into()) { - Some(id) => id.to_string(), - None => return, - }; - - // We are processing a top level query span. Lets register it. - if let Some(query_sql) = span_data.attributes.get(&"query_sql".into()) { - QUERY.with(|query_space| { - let tuple = (id, query_sql.to_string(), 2); - query_space.borrow_mut().upsert(tuple); - }); - debug!(Option::from("on start"), &format!("query: {query_sql}")); - } - - // Register current span mapping (span id: span name). - SPAN.with(|span_table| { - let key = span_data.span_context.span_id(); - let value = SpanName::from(span_data.name.clone()); - debug!( - Option::from("on start"), - &format!("key: {key:?}, value: {value:?}") - ); - span_table.borrow_mut().push(key, value); - }); - } - - fn on_end(&self, span: SpanData) { - let id: String = match span.attributes.get(&"id".into()) { - Some(id) => id.to_string(), - None => return, - }; - - let parent_span: String = if span.parent_span_id == SpanId::INVALID { - String::new() - } else { - SPAN.with(|span_table| { - let span_table = span_table.borrow(); - span_table - .get(&span.parent_span_id) - .map_or_else(String::new, |span_name| span_name.value().to_string()) - }) - }; - - let duration = match span.end_time.duration_since(span.start_time) { - Ok(duration) => duration, - // The clock may have gone backwards. - Err(_) => Duration::from_secs(0), - } - .as_secs_f64(); - // Update statistics. - STAT.with(|stat_space| { - let tuple = ( - id.to_string(), - String::from(span.name), - parent_span, - duration, - duration, - duration, - 1, - ); - stat_space.borrow_mut().upsert(tuple); - }); - - // Remove current span id to name mapping. - SPAN.with(|span_table| { - span_table.borrow_mut().pop(&span.span_context.span_id()); - }); - - // Unreference the query for the top level query span. - // We don't want to remove the query while some other - // fiber is still collecting statistics for it. - if span.attributes.get(&"query_sql".into()).is_some() { - QUERY.with(|query_space| { - query_space.borrow_mut().delete(&(id.to_string(),)); - }); - } - - // Evict old queries. - TRACKED_QUERIES.with(|tracked_queries| { - let mut tracked_queries = tracked_queries.borrow_mut(); - tracked_queries.push(id).unwrap(); - }); - } - - fn force_flush(&self) -> opentelemetry::trace::TraceResult<()> { - Ok(()) - } - - fn shutdown(&mut self) -> opentelemetry::trace::TraceResult<()> { - Ok(()) - } -} diff --git a/sbroad-core/src/otm/statistics/eviction.rs b/sbroad-core/src/otm/statistics/eviction.rs deleted file mode 100644 index e2e4aa6532..0000000000 --- a/sbroad-core/src/otm/statistics/eviction.rs +++ /dev/null @@ -1,60 +0,0 @@ -//! Statistics eviction module. -//! -//! This module is used to evict statistics from the `__sbroad_stat` -//! and `__sbroad_query` spaces. The eviction is performed by LRU -//! strategy at the threshold of 1000 entries in the `__sbroad_query` -//! space. - -use crate::debug; -use crate::errors::SbroadError; -use crate::executor::lru::{Cache, LRUCache}; -use crate::otm::statistics::table::{TarantoolSpace, QUERY}; -use std::cell::RefCell; - -pub const STATISTICS_CAPACITY: usize = 1000; -thread_local!(pub(super) static TRACKED_QUERIES: RefCell<TrackedQueries> = RefCell::new(TrackedQueries::new())); - -pub struct TrackedQueries { - queries: LRUCache<String, String>, -} - -#[allow(clippy::unnecessary_wraps)] -fn remove_query(query_id: &mut String) -> Result<(), SbroadError> { - QUERY.with(|query_space| { - let mut query_space = query_space.borrow_mut(); - debug!( - Option::from("tracked queries"), - &format!("remove query: {query_id}") - ); - let key = std::mem::take(query_id); - query_space.delete(&(key,)); - }); - Ok(()) -} - -impl Default for TrackedQueries { - fn default() -> Self { - Self::new() - } -} - -impl TrackedQueries { - /// Create a new instance of `TrackedQueries`. - /// - /// # Panics - /// - If the `STATISTICS_CAPACITY` is less than 1 (impossible at the moment). - #[must_use] - pub fn new() -> Self { - Self { - queries: LRUCache::new(STATISTICS_CAPACITY, Some(Box::new(remove_query))).unwrap(), - } - } - - /// Add a new query to the tracked queries. - /// - /// # Errors - /// - Internal error in the eviction function. - pub fn push(&mut self, key: String) -> Result<(), SbroadError> { - self.queries.put(key.clone(), key) - } -} diff --git a/sbroad-core/src/otm/statistics/table.rs b/sbroad-core/src/otm/statistics/table.rs deleted file mode 100644 index e06aab46e0..0000000000 --- a/sbroad-core/src/otm/statistics/table.rs +++ /dev/null @@ -1,598 +0,0 @@ -//! Statistics table module. -//! -//! This module contains the implementation of the statistics spaces used by -//! the `statistics` span processor. There are two spaces and one hash table -//! used to store the statistics: -//! - `_sql_query` space - stores the queries that are currently being executed. -//! Its query id is used as a key for the `_sql_stat` space. -//! - `_sql_stat` space - stores the statistics for the query spans. The spans -//! are stored as a flat tree for each query. -//! - `SpanMap` hash table - stores the mapping between the span id and the span name. -//! The reason is that the span context contains only the span id, so we use -//! this table to save the span name when create the span (and remove it when -//! the span is finished). -//! -//! Keep in mind that the spaces are created in a "lazy" way, i.e. they are created -//! only when statistics processor tries to write to them. As we collect statistics -//! only for 1% of the queries, there is a chance that right after the instance start -//! the statistics spaces still would not exist. Though it is not a problem, such -//! situation can surprise the user. Also we should remember that read-only replicas -//! can not create spaces (even the temporary ones). As a result, the spaces can be -//! "lazily" created on the storages only when some of them are dispatched to the -//! master instance. - -use ahash::AHashMap; -use index::Part; -use opentelemetry::trace::SpanId; -use space::Field; -use std::borrow::Cow; -use std::cell::RefCell; -use tarantool::index::{FieldType, IndexOptions, IndexType, IteratorType}; -use tarantool::space::{Space, SpaceCreateOptions, SpaceEngineType, SpaceType}; -use tarantool::{index, space}; - -use crate::{debug, warn}; - -thread_local!(pub static QUERY: RefCell<QuerySpace> = RefCell::new(QuerySpace::new())); -thread_local!(pub static SPAN: RefCell<SpanMap> = RefCell::new(SpanMap::new())); -thread_local!(pub static STAT: RefCell<StatSpace> = RefCell::new(StatSpace::new())); - -pub trait RustMap { - type Key; - type Value; - - fn new() -> Self; - fn get(&self, key: &Self::Key) -> Option<&Self::Value>; - fn get_mut(&mut self, key: &Self::Key) -> Option<&mut Self::Value>; - fn push(&mut self, key: Self::Key, value: Self::Value); - fn pop(&mut self, key: &Self::Key) -> Option<Self::Value>; -} - -pub struct SpanMap(AHashMap<SpanId, SpanName>); - -impl RustMap for SpanMap { - type Key = SpanId; - type Value = SpanName; - - fn new() -> Self { - Self(AHashMap::new()) - } - - fn get(&self, key: &Self::Key) -> Option<&Self::Value> { - self.0.get(key) - } - - fn get_mut(&mut self, key: &Self::Key) -> Option<&mut Self::Value> { - self.0.get_mut(key) - } - - fn push(&mut self, key: Self::Key, value: Self::Value) { - self.0.insert(key, value); - } - - fn pop(&mut self, key: &Self::Key) -> Option<Self::Value> { - self.0.remove(key) - } -} - -#[derive(Clone, Debug, Eq, PartialEq, Hash)] -pub struct SpanName(String); - -impl SpanName { - #[must_use] - pub fn value(&self) -> &str { - &self.0 - } -} - -impl From<&str> for SpanName { - fn from(s: &str) -> Self { - Self(s.to_string()) - } -} - -impl From<Cow<'static, str>> for SpanName { - fn from(s: Cow<'static, str>) -> Self { - Self(s.to_string()) - } -} - -pub trait TarantoolSpace { - type Key; - type Tuple; - - fn new() -> Self; - fn get(&mut self, key: &Self::Key) -> Option<Self::Tuple>; - fn upsert(&mut self, tuple: Self::Tuple); - fn delete(&mut self, key: &Self::Key) -> Option<Self::Tuple>; - - /// Space name. - fn name(&self) -> &str; - - /// Reference to the cached space. - fn space(&self) -> &Option<Space>; - - /// Mutable reference to the cached space. - fn space_mut(&mut self) -> &mut Option<Space>; - - /// The space can be created only on the master node of the - /// replica set. The read-only replicas can't create spaces - /// (DDL would be replicated from master some moment later). - fn try_create_space(&mut self); - - /// Update the cached space information. This method should - /// be called before any space operation. - fn space_update_cache(&mut self) { - let name = self.name().to_string(); - match self.space_mut() { - Some(space) => { - let space_id = space.id(); - // Find the space to validate the its id. - if let Some(found_space) = Space::find(&name) { - // The space was recreated by user, need to update the cache. - if found_space.id() != space_id { - warn!( - Option::from("space"), - &format!( - "Space {} was found with different id: {} != {}", - &name, - space_id, - found_space.id() - ) - ); - *space = found_space; - } - } else { - // Someone has removed the space from the instance. - // Remove the cached space and try to create it again. - *self.space_mut() = None; - self.try_create_space(); - } - } - None => { - // Try to find the space. - if let Some(space) = Space::find(&name) { - *self.space_mut() = Some(space); - } else { - // Try to create the space. - self.try_create_space(); - } - } - } - } -} - -pub struct QuerySpace { - space: Option<Space>, - name: String, -} - -impl TarantoolSpace for QuerySpace { - type Key = (String,); - type Tuple = (String, String, u64); - - fn new() -> Self { - let mut space = Self { - space: None, - name: "_sql_query".to_string(), - }; - space.space_update_cache(); - space - } - - fn name(&self) -> &str { - &self.name - } - - fn space(&self) -> &Option<Space> { - &self.space - } - - fn space_mut(&mut self) -> &mut Option<Space> { - &mut self.space - } - - fn get(&mut self, key: &Self::Key) -> Option<Self::Tuple> { - self.space_update_cache(); - if let Some(space) = self.space() { - match space.select(IteratorType::Eq, key) { - Ok(mut iter) => { - if let Some(tuple) = iter.next() { - match tuple.decode::<Self::Tuple>() { - Ok(tuple) => return Some(tuple), - Err(_e) => { - warn!( - Option::from("space query"), - &format!("Failed to decode tuple: {}", _e) - ); - } - } - } - } - Err(_e) => { - warn!( - Option::from("space query"), - &format!("Space {} select error: {}", self.name(), _e) - ); - } - } - } - None - } - - fn upsert(&mut self, tuple: Self::Tuple) { - self.space_update_cache(); - if let Some(space) = self.space_mut() { - if let Err(_e) = space.upsert(&tuple, [("+", 2, 1)]) { - warn!( - Option::from("space query"), - &format!("Space {} upsert error: {}", self.name(), _e) - ); - } else { - debug!( - Option::from("space query"), - &format!("increment ref_counter") - ); - } - } - } - - fn delete(&mut self, key: &Self::Key) -> Option<Self::Tuple> { - self.space_update_cache(); - if let Some(space) = self.space_mut() { - match space.select(IteratorType::Eq, key) { - Ok(mut iterator) => { - if let Some(tuple) = iterator.next() { - match tuple.decode::<Self::Tuple>() { - Ok((_, _, ref_counter)) => { - if ref_counter > 0 { - if let Err(_e) = space.upsert(&tuple, [("-", 2, 1)]) { - warn!( - Option::from("space query"), - &format!("Space {} upsert error: {}", self.name(), _e) - ); - } else { - debug!( - Option::from("space query"), - &format!("decrement ref_counter") - ); - } - } else { - let result = match space.delete(key) { - Ok(tuple) => { - if let Some(tuple) = tuple { - match tuple.decode::<Self::Tuple>() { - Ok(tuple) => { - debug!( - Option::from("space query"), - &format!("delete tuple") - ); - Some(tuple) - } - Err(_e) => { - warn!( - Option::from("space query"), - &format!( - "Failed to decode tuple: {}", - _e - ) - ); - None - } - } - } else { - None - } - } - Err(_e) => { - warn!( - Option::from("space query"), - &format!( - "Space {} delete error: {}", - self.name(), - _e - ) - ); - None - } - }; - STAT.with(|stat| { - let query_id = key.0.as_str(); - stat.borrow_mut().delete_query(query_id); - }); - - return result; - } - } - Err(err) => { - let _msg = format!("Can't decode tuple: {err}"); - warn!(Option::from("space query"), &_msg); - } - } - } - } - Err(err) => { - let _msg = &format!("failed to select tuple: {err}"); - warn!(Option::from("space query"), &_msg); - } - } - } - None - } - - fn try_create_space(&mut self) { - let options = SpaceCreateOptions { - format: Some(vec![ - Field::string("query_id"), - Field::string("query_text"), - Field::unsigned("ref_counter"), - ]), - engine: SpaceEngineType::Memtx, - space_type: SpaceType::DataTemporary, - if_not_exists: true, - ..Default::default() - }; - - let pk = IndexOptions { - r#type: Some(IndexType::Tree), - unique: Some(true), - parts: Some(vec![Part::new(1, FieldType::String)]), - if_not_exists: Some(true), - ..Default::default() - }; - - match Space::create(&self.name, &options) { - Ok(table) => { - debug!(Option::from("space query"), "Space created"); - match table.create_index("_sql_query_pk", &pk) { - Ok(_) => { - debug!(Option::from("space query"), "Index _sql_query_pk created"); - let space = self.space_mut(); - *space = Some(table); - } - Err(err) => { - let _msg = &format!("failed to create index _sql_query_pk: {err}"); - warn!(Option::from("space query"), &_msg); - } - } - } - Err(_e) => { - warn!( - Option::from("space query"), - &format!("Failed to create a space: {}", _e) - ); - } - } - } -} - -pub struct StatSpace { - space: Option<Space>, - name: String, -} - -type StatSpaceTuple = (String, String, String, f64, f64, f64, u64); - -impl TarantoolSpace for StatSpace { - type Key = (String, String, String); - type Tuple = StatSpaceTuple; - - fn new() -> Self { - let mut space = Self { - space: None, - name: "_sql_stat".to_string(), - }; - space.space_update_cache(); - space - } - - fn name(&self) -> &str { - &self.name - } - - fn space(&self) -> &Option<Space> { - &self.space - } - - fn space_mut(&mut self) -> &mut Option<Space> { - &mut self.space - } - - fn try_create_space(&mut self) { - let options = SpaceCreateOptions { - format: Some(vec![ - Field::string("query_id"), - Field::string("span"), - Field::string("parent_span"), - Field::double("min"), - Field::double("max"), - Field::double("sum"), - Field::unsigned("count"), - ]), - engine: SpaceEngineType::Memtx, - space_type: SpaceType::DataTemporary, - if_not_exists: true, - ..Default::default() - }; - - let pk = IndexOptions { - r#type: Some(IndexType::Tree), - unique: Some(true), - parts: Some(vec![ - Part::new(1, FieldType::String), - Part::new(2, FieldType::String), - Part::new(3, FieldType::String), - ]), - if_not_exists: Some(true), - ..Default::default() - }; - - match Space::create(&self.name, &options) { - Ok(table) => { - debug!(Option::from("space stat"), "Space created"); - match table.create_index("_sql_stat_pk", &pk) { - Ok(_) => { - debug!(Option::from("space stat"), "Index _sql_stat_pk created"); - let space = self.space_mut(); - *space = Some(table); - } - Err(err) => { - let _msg = &format!("failed to create index _sql_stat_pk: {err}"); - warn!(Option::from("space stat"), &_msg); - } - } - } - Err(_e) => { - warn!( - Option::from("space stat"), - &format!("Failed to create a space: {}", _e) - ); - } - } - } - - fn get(&mut self, key: &Self::Key) -> Option<Self::Tuple> { - self.space_update_cache(); - if let Some(space) = self.space() { - match space.select(IteratorType::Eq, key) { - Err(_e) => { - warn!( - Option::from("space stat"), - &format!("Space {} select error: {}", self.name(), _e) - ); - } - Ok(mut iter) => { - if let Some(tuple) = iter.next() { - match tuple.decode::<Self::Tuple>() { - Ok(tuple) => { - debug!(Option::from("space stat"), "get tuple"); - return Some(tuple); - } - Err(_e) => { - warn!( - Option::from("space stat"), - &format!("Failed to decode tuple: {}", _e) - ); - } - } - } - } - } - } - None - } - - fn upsert(&mut self, tuple: Self::Tuple) { - self.space_update_cache(); - match self.get(&(tuple.0.clone(), tuple.1.clone(), tuple.2.clone())) { - None => { - debug!(Option::from("space stat"), &format!("insert {:?}", tuple)); - if let Some(space) = self.space_mut() { - match space.insert(&tuple) { - Ok(_) => { - debug!(Option::from("space stat"), "inserted"); - } - Err(_e) => { - warn!( - Option::from("space stat"), - &format!("Space {} insert error: {}", self.name(), _e) - ); - } - } - } - } - Some(mut found_tuple) => { - debug!( - Option::from("space stat"), - &format!("update {:?} with {:?}", found_tuple, tuple) - ); - found_tuple.3 = tuple.3.min(found_tuple.3); - found_tuple.4 = tuple.4.max(found_tuple.4); - found_tuple.5 += tuple.5; - found_tuple.6 += tuple.6; - if let Some(space) = self.space_mut() { - match space.replace(&found_tuple) { - Ok(_) => { - debug!(Option::from("space stat"), "replaced"); - } - Err(_e) => { - warn!( - Option::from("space stat"), - &format!("Space {} update error: {}", self.name(), _e) - ); - } - } - } - } - } - } - - fn delete(&mut self, key: &Self::Key) -> Option<Self::Tuple> { - self.space_update_cache(); - if let Some(space) = self.space_mut() { - match space.delete(&key) { - Ok(tuple) => { - if let Some(tuple) = tuple { - match tuple.decode::<Self::Tuple>() { - Ok(tuple) => { - debug!(Option::from("space stat"), "deleted"); - return Some(tuple); - } - Err(_e) => { - warn!( - Option::from("space stat"), - &format!("Failed to decode tuple: {}", _e) - ); - } - } - } - } - Err(_e) => { - warn!( - Option::from("space stat"), - &format!("Space {} delete error: {}", self.name(), _e) - ); - } - } - } - None - } -} - -impl StatSpace { - fn delete_query(&mut self, query_id: &str) { - self.space_update_cache(); - let keys = if let Some(space) = self.space() { - let index_scan = match space.select(IteratorType::Eq, &(query_id,)) { - Err(_e) => { - warn!( - Option::from("space stat"), - &format!("Space {} select error: {}", self.name(), _e) - ); - return; - } - Ok(iter) => iter, - }; - index_scan - .filter_map(|tuple| match tuple.decode::<StatSpaceTuple>() { - Ok(tuple) => { - debug!(Option::from("space stat"), "get tuple"); - Some((tuple.0, tuple.1, tuple.2)) - } - Err(_e) => { - warn!( - Option::from("space stat"), - &format!("Failed to decode tuple: {}", _e) - ); - None - } - }) - .collect::<Vec<_>>() - } else { - return; - }; - for key in keys { - self.delete(&key); - } - } -} -- GitLab