diff --git a/sbroad-cartridge/src/api/exec_query.rs b/sbroad-cartridge/src/api/exec_query.rs index 144709698304f445dc48cdfbab01a1bc5c852471..89606158b571a779463f557b77208d15fbd2234a 100644 --- a/sbroad-cartridge/src/api/exec_query.rs +++ b/sbroad-cartridge/src/api/exec_query.rs @@ -18,50 +18,66 @@ use sbroad::{debug, error}; pub extern "C" fn dispatch_query(f_ctx: FunctionCtx, args: FunctionArgs) -> c_int { let lua_params = match PatternWithParams::try_from(args) { Ok(params) => params, - Err(e) => return tarantool_error(&e.to_string()), + Err(e) => { + error!(Option::from("dispatch_query"), &format!("Error: {}", e)); + return tarantool_error(&e.to_string()); + } }; // We initialize the global tracer on every configuration update. // As a side effect, we can't trace load_config() call itself (sic!). let ret_code = load_config(&COORDINATOR_ENGINE); - let (id, ctx, tracer) = extract_params(lua_params.context, lua_params.id); - - query_span("api.router", &id, tracer, &ctx, &lua_params.pattern, || { - if ret_code != 0 { - return ret_code; - } - COORDINATOR_ENGINE.with(|engine| { - let runtime = match engine.try_borrow() { - Ok(runtime) => runtime, - Err(e) => { - return tarantool_error(&format!( - "Failed to borrow the runtime while dispatching the query: {}", - e - )); - } - }; - let mut query = match Query::new(&*runtime, &lua_params.pattern, lua_params.params) { - Ok(q) => q, - Err(e) => { - error!(Option::from("query dispatch"), &format!("{:?}", e)); - return tarantool_error(&e.to_string()); - } - }; + let (id, ctx, tracer) = extract_params( + lua_params.context, + lua_params.id, + &lua_params.pattern, + lua_params.force_trace, + ); - match query.dispatch() { - Ok(result) => child_span("tarantool.tuple.return", || { - if let Some(tuple) = (&*result).downcast_ref::<Tuple>() { - f_ctx.return_tuple(tuple).unwrap(); - 0 - } else { - tarantool_error("Unsupported result type") - } - }), - Err(e) => tarantool_error(&e.to_string()), + query_span( + "\"api.router\"", + &id, + &tracer, + &ctx, + &lua_params.pattern, + || { + if ret_code != 0 { + return ret_code; } - }) - }) + COORDINATOR_ENGINE.with(|engine| { + let runtime = match engine.try_borrow() { + Ok(runtime) => runtime, + Err(e) => { + return tarantool_error(&format!( + "Failed to borrow the runtime while dispatching the query: {}", + e + )); + } + }; + let mut query = match Query::new(&*runtime, &lua_params.pattern, lua_params.params) + { + Ok(q) => q, + Err(e) => { + error!(Option::from("query dispatch"), &format!("{:?}", e)); + return tarantool_error(&e.to_string()); + } + }; + + match query.dispatch() { + Ok(result) => child_span("\"tarantool.tuple.return\"", || { + if let Some(tuple) = (&*result).downcast_ref::<Tuple>() { + f_ctx.return_tuple(tuple).unwrap(); + 0 + } else { + tarantool_error("Unsupported result type") + } + }), + Err(e) => tarantool_error(&e.to_string()), + } + }) + }, + ) } #[derive(Debug)] @@ -71,6 +87,7 @@ struct ExecuteQueryParams { context: Option<HashMap<String, String>>, id: Option<String>, is_data_modifier: bool, + force_trace: bool, } impl TryFrom<FunctionArgs> for ExecuteQueryParams { @@ -102,6 +119,7 @@ impl<'de> Deserialize<'de> for ExecuteQueryParams { Option<HashMap<String, String>>, Option<String>, bool, + bool, ); let struct_helper = StructHelper::deserialize(deserializer)?; @@ -112,6 +130,7 @@ impl<'de> Deserialize<'de> for ExecuteQueryParams { context: struct_helper.2, id: struct_helper.3, is_data_modifier: struct_helper.4, + force_trace: struct_helper.5, }) } } @@ -128,11 +147,16 @@ pub extern "C" fn execute_query(f_ctx: FunctionCtx, args: FunctionArgs) -> c_int return ret_code; } - let (id, ctx, tracer) = extract_params(lua_params.context, lua_params.id); + let (id, ctx, tracer) = extract_params( + lua_params.context, + lua_params.id, + &lua_params.pattern, + lua_params.force_trace, + ); query_span( - "api.storage", + "\"api.storage\"", &id, - tracer, + &tracer, &ctx, &lua_params.pattern, || { diff --git a/sbroad-cartridge/src/router.lua b/sbroad-cartridge/src/router.lua index 8bf48746d6ed329b9979e1d5afcca1a05d000c26..00a069296c7bd50ef13b4afb86babc672e20837c 100644 --- a/sbroad-cartridge/src/router.lua +++ b/sbroad-cartridge/src/router.lua @@ -61,7 +61,7 @@ _G.read_on_some = function(tbl_rs_query, waiting_timeout) local replica = vshard.router.routeall()[rs_uuid] local future, err = replica:callbre( "libsbroad.execute_query", - { query['pattern'], query['params'], query['context'], query['id'], false }, + { query['pattern'], query['params'], query['context'], query['id'], false, query['force_trace'] }, { is_async = true } ) if err ~= nil then @@ -98,7 +98,7 @@ _G.write_on_some = function(tbl_rs_query, waiting_timeout) local replica = vshard.router.routeall()[rs_uuid] local future, err = replica:callrw( "libsbroad.execute_query", - { query['pattern'], query['params'], query['context'], query['id'], true }, + { query['pattern'], query['params'], query['context'], query['id'], true, query['force_trace'] }, { is_async = true } ) if err ~= nil then @@ -133,7 +133,7 @@ _G.read_on_all = function(query, waiting_timeout) for _, replica in pairs(replicas) do local future, err = replica:callbre( "libsbroad.execute_query", - { query['pattern'], query['params'], query['context'], query['id'], false }, + { query['pattern'], query['params'], query['context'], query['id'], false, query['force_trace'] }, { is_async = true } ) if err ~= nil then @@ -170,7 +170,7 @@ _G.write_on_all = function(query, waiting_timeout) for _, replica in pairs(replicas) do local future, err = replica:callrw( "libsbroad.execute_query", - { query['pattern'], query['params'], query['context'], query['id'], true }, + { query['pattern'], query['params'], query['context'], query['id'], true, query['force_trace'] }, { is_async = true } ) if err ~= nil then @@ -254,7 +254,7 @@ end local function trace(query, params, context, id) local has_err, parser_res = pcall( function() - return box.func["libsbroad.dispatch_query"]:call({ query, params, context, id }) + return box.func["libsbroad.dispatch_query"]:call({ query, params, context, id, false }) end ) diff --git a/sbroad-core/src/backend/sql/ir.rs b/sbroad-core/src/backend/sql/ir.rs index b8c61d0b44a2710855d23c4f4ed97f4e8c43a179..7697bdf7a9d09578d41f1440e34848be0b95622b 100644 --- a/sbroad-core/src/backend/sql/ir.rs +++ b/sbroad-core/src/backend/sql/ir.rs @@ -14,7 +14,7 @@ use crate::ir::expression::Expression; use crate::ir::operator::Relational; use crate::ir::value::Value; use crate::ir::Node; -use crate::otm::{child_span, current_id, inject_context}; +use crate::otm::{child_span, current_id, force_trace, inject_context}; use super::tree::SyntaxData; @@ -24,6 +24,7 @@ pub struct PatternWithParams { pub params: Vec<Value>, pub context: Option<HashMap<String, String>>, pub id: Option<String>, + pub force_trace: bool, } impl PartialEq for PatternWithParams { @@ -63,6 +64,7 @@ impl<'de> Deserialize<'de> for PatternWithParams { Vec<Value>, Option<HashMap<String, String>>, Option<String>, + bool, ); let struct_helper = StructHelper::deserialize(deserializer)?; @@ -72,6 +74,7 @@ impl<'de> Deserialize<'de> for PatternWithParams { params: struct_helper.1, context: struct_helper.2, id: struct_helper.3, + force_trace: struct_helper.4, }) } } @@ -81,12 +84,14 @@ impl PatternWithParams { pub fn new(pattern: String, params: Vec<Value>) -> Self { let mut carrier = HashMap::new(); inject_context(&mut carrier); + let force_trace = force_trace(); if carrier.is_empty() { PatternWithParams { pattern, params, context: None, id: None, + force_trace, } } else { PatternWithParams { @@ -94,6 +99,7 @@ impl PatternWithParams { params, context: Some(carrier), id: Some(current_id()), + force_trace, } } } @@ -117,7 +123,7 @@ impl ExecutionPlan { nodes: &[&SyntaxData], buckets: &Buckets, ) -> Result<PatternWithParams, QueryPlannerError> { - let (sql, params) = child_span("syntax.ordered.sql", || { + let (sql, params) = child_span("\"syntax.ordered.sql\"", || { let mut params: Vec<Value> = Vec::new(); let mut sql = String::new(); diff --git a/sbroad-core/src/executor.rs b/sbroad-core/src/executor.rs index 70f0fac86701f1d349bcf71044761928d62d2341..fe0e3e2033dc475dec9db3fdce366184867fbf2b 100644 --- a/sbroad-core/src/executor.rs +++ b/sbroad-core/src/executor.rs @@ -41,10 +41,8 @@ use crate::ir::relation::{Column, ColumnRole, Type}; use crate::ir::transformation::redistribution::{DataGeneration, MotionKey, MotionPolicy, Target}; use crate::ir::value::Value; use crate::ir::Plan; -use crate::otm::child_span; -use base64ct::{Base64, Encoding}; +use crate::otm::{child_span, query_id}; use sbroad_proc::otm_child_span; -use sha2::{Digest, Sha256}; pub mod bucket; pub mod engine; @@ -101,8 +99,7 @@ where C::Cache: Cache<String, Plan>, C::ParseTree: Ast, { - let hash = Sha256::digest(sql.as_bytes()); - let key = Base64::encode_string(&hash); + let key = query_id(sql); let ir_cache = coordinator.ir_cache(); let mut plan = Plan::new(); diff --git a/sbroad-core/src/otm.rs b/sbroad-core/src/otm.rs index 6f5b3fa94d979d353bf0ac32e1b97bb76f24202d..53ad79110d9deb911ae1f7810fbe7e635178ffac 100644 --- a/sbroad-core/src/otm.rs +++ b/sbroad-core/src/otm.rs @@ -1,38 +1,73 @@ //! Opentelemetry module +//! +//! This module contains the opentelemetry instrumentation for the sbroad library. +//! There are two main use case for it: +//! - tracing of some exact query (global tracer exports spans to the jaeger) +//! - query execution statistics sampled from 1% of all queries (statistics tracer +//! writing to the temporary spaces) use crate::debug; use ahash::AHashMap; +use base64ct::{Base64, Encoding}; use opentelemetry::global::{get_text_map_propagator, tracer, BoxedTracer}; use opentelemetry::propagation::{Extractor, Injector}; -use opentelemetry::trace::noop::NoopTracer; +use opentelemetry::sdk::trace::{ + Config as SdkConfig, Sampler as SdkSampler, Tracer as SdkTracer, + TracerProvider as SdkTracerProvider, +}; +use opentelemetry::trace::TracerProvider; #[allow(unused_imports)] use opentelemetry::trace::{SpanBuilder, SpanKind, TraceContextExt, Tracer}; #[allow(unused_imports)] use opentelemetry::{Context, KeyValue}; +use sha2::{Digest, Sha256}; use std::cell::RefCell; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; mod fiber; +pub mod statistics; +#[cfg(not(feature = "mock"))] +use crate::warn; #[cfg(not(feature = "mock"))] use fiber::fiber_id; +#[cfg(not(feature = "mock"))] +use tarantool::error::Error as TntError; +#[cfg(not(feature = "mock"))] +use tarantool::transaction::start_transaction; static TRACER_NAME: &str = "libsbroad"; -thread_local!(static TRACE_MANAGER: RefCell<TraceManager> = RefCell::new(TraceManager::new())); +static RATIO: f64 = 0.01; +thread_local!( + /// Thread-local storage for the trace information of the current fiber. + /// + /// Pay attention, that all mutable accesses to this variable should be + /// wrapped with Tarantool transaction. The reason is that statistics + /// tracer can create temporary tables on the instance. As a result, + /// Tarantool yields the fiber while the RefCell is still mutably borrowed. + /// An access to TRACE_MANAGER on a new fiber leads to panic. So we have to + /// be sure that transaction commit is called after the RefCell is released. + static TRACE_MANAGER: RefCell<TraceManager> = RefCell::new(TraceManager::new()) +); thread_local!(static GLOBAL_TRACER: RefCell<BoxedTracer> = RefCell::new(tracer(TRACER_NAME))); lazy_static! { - static ref NOOP_TRACER: NoopTracer = NoopTracer::new(); + static ref STATISTICS_PROVIDER: SdkTracerProvider = SdkTracerProvider::builder() + .with_span_processor(statistics::StatCollector::new()) + .with_config(SdkConfig::default().with_sampler(SdkSampler::TraceIdRatioBased(RATIO))) + .build(); + #[derive(Debug)] + static ref STATISTICS_TRACER: SdkTracer = STATISTICS_PROVIDER.versioned_tracer("stat", None, None); } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] #[allow(dead_code)] pub enum QueryTracer { Global, - Noop, + Statistics, } #[allow(dead_code)] @@ -48,6 +83,7 @@ impl Debug for TraceInfo { .field("tracer", &self.tracer) // Context `Debug` trait doesn't print anything useful. .field("context", &self.context.span()) + .field("id", &self.id) .finish() } } @@ -63,7 +99,8 @@ impl TraceInfo { } fn empty() -> Self { - Self::new(QueryTracer::Noop, Context::new(), "".to_string()) + // TODO: handle disable statistics. + Self::new(QueryTracer::Statistics, Context::new(), "".to_string()) } fn tracer(&self) -> &QueryTracer { @@ -105,11 +142,11 @@ impl TraceManager { #[inline] #[allow(dead_code)] -// We need this function as NoopTracer doesn't implement `ObjectSafeTracer` trait. +// We need this function as statistics tracer doesn't implement `ObjectSafeTracer` trait. fn build_ctx(tracer: &QueryTracer, sb: SpanBuilder, ctx: &Context) -> Context { match tracer { - QueryTracer::Noop => { - let span = NOOP_TRACER.build_with_context(sb, ctx); + QueryTracer::Statistics => { + let span = STATISTICS_TRACER.build_with_context(sb, ctx); ctx.with_span(span) } QueryTracer::Global => { @@ -129,12 +166,15 @@ where #[cfg(not(feature = "mock"))] { let fid = fiber_id(); - let old_ti = TRACE_MANAGER.with(|tm| { - tm.borrow_mut() - .remove(fid) - .map_or(TraceInfo::empty(), |ti| ti) - }); let id = current_id(); + let old_ti = start_transaction(|| -> Result<TraceInfo, TntError> { + Ok(TRACE_MANAGER.with(|tm| { + tm.borrow_mut() + .remove(fid) + .map_or(TraceInfo::empty(), |ti| ti) + })) + }) + .unwrap(); let ctx = build_ctx( old_ti.tracer(), SpanBuilder::from_name(name) @@ -143,23 +183,53 @@ where old_ti.context(), ); let ti = TraceInfo::new(old_ti.tracer().clone(), ctx, id); - debug!( - Option::from("child span"), - &format!( - "fiber {}, child span {}: insert new trace info {:?}", - fid, name, ti - ), - ); - TRACE_MANAGER.with(|tm| tm.borrow_mut().insert(fid, ti)); + start_transaction(|| -> Result<(), TntError> { + TRACE_MANAGER.with(|tm| match tm.try_borrow_mut() { + Ok(mut mut_tm) => { + debug!( + Option::from("child span"), + &format!( + "fiber {}, child span {}: insert trace info {:?}", + fid, name, ti + ), + ); + mut_tm.insert(fid, ti); + } + Err(_e) => { + warn!( + Option::from("query span"), + &format!( + "fiber {}, child span {}: failed to insert trace info {:?}, error: {:?}", + fid, name, ti, _e + ), + ); + } + }); + Ok(()) + }) + .unwrap(); let result = f(); - debug!( - Option::from("child span"), - &format!( - "fiber {}, child span {}: restore old trace info {:?}", - fid, name, old_ti - ), - ); - TRACE_MANAGER.with(|tm| tm.borrow_mut().insert(fid, old_ti)); + start_transaction(|| -> Result<(), TntError> { + TRACE_MANAGER.with(|tm| match tm.try_borrow_mut() { + Ok(mut mut_tm) => { + debug!( + Option::from("child span"), + &format!("fiber {}, child span {}: restore old trace info {:?}", fid, name, old_ti), + ); + mut_tm.insert(fid, old_ti); + } + Err(_e) => { + warn!( + Option::from("query span"), + &format!( + "fiber {}, child span {}: failed to restore old trace info {:?}, error: {:?}", + fid, name, old_ti, _e + ), + ); + } + }); + Ok(()) + }).unwrap(); return result; } f() @@ -171,7 +241,7 @@ where pub fn query_span<T, F>( name: &'static str, id: &str, - tracer: QueryTracer, + tracer: &QueryTracer, ctx: &Context, sql: &str, f: F, @@ -187,8 +257,8 @@ where "query_sql", sql.char_indices() // Maximum number of bytes per a single name-value pair: 4096. - // UTF-8 character can be up to 4 bytes long, `query_id` is 8 bytes long. - .filter_map(|(i, c)| if i <= 4084 { Some(c) } else { None }) + // UTF-8 character can be up to 4 bytes long, `query_sql` is 9 bytes long. + .filter_map(|(i, c)| if i <= 4083 { Some(c) } else { None }) .collect::<String>(), )); @@ -200,22 +270,55 @@ where .with_attributes(attributes), ctx, ); - let ti = TraceInfo::new(tracer, ctx, id.to_string()); + let ti = TraceInfo::new(tracer.clone(), ctx, id.to_string()); - debug!( - Option::from("query span"), - &format!( - "fiber {}, query span {}: insert trace info {:?}", - fid, name, ti - ), - ); - TRACE_MANAGER.with(|tm| tm.borrow_mut().insert(fid, ti)); + start_transaction(|| -> Result<(), TntError> { + TRACE_MANAGER.with(|tm| match tm.try_borrow_mut() { + Ok(mut mut_tm) => { + debug!( + Option::from("query span"), + &format!( + "fiber {}, query span {}: insert trace info {:?}", + fid, name, ti + ), + ); + mut_tm.insert(fid, ti); + } + Err(_e) => { + warn!( + Option::from("query span"), + &format!( + "fiber {}, query span {}: failed to insert trace info {:?}, error: {:?}", + fid, name, ti, _e + ), + ); + } + }); + Ok(()) + }).unwrap(); let result = f(); - debug!( - Option::from("query span"), - &format!("fiber {}, query span {}: remove trace info", fid, name), - ); - TRACE_MANAGER.with(|tm| tm.borrow_mut().remove(fid)); + start_transaction(|| -> Result<(), TntError> { + TRACE_MANAGER.with(|tm| match tm.try_borrow_mut() { + Ok(mut mut_tm) => { + debug!( + Option::from("query span"), + &format!("fiber {}, query span {}: remove trace info", fid, name), + ); + mut_tm.remove(fid); + } + Err(_e) => { + warn!( + Option::from("query span"), + &format!( + "fiber {}, query span {}: failed to remove trace info, error: {:?}", + fid, name, _e + ), + ); + } + }); + Ok(()) + }) + .unwrap(); return result; } f() @@ -238,14 +341,36 @@ pub fn current_id() -> String { #[inline] #[must_use] -pub fn new_id() -> String { +fn new_id() -> String { uuid::Uuid::new_v4().to_string() } +#[inline] +#[must_use] +pub fn query_id(pattern: &str) -> String { + let hash = Sha256::digest(pattern.as_bytes()); + Base64::encode_string(&hash) +} + pub fn update_global_tracer() { GLOBAL_TRACER.with(|gt| *gt.borrow_mut() = tracer(TRACER_NAME)); } +#[must_use] +#[allow(unreachable_code)] +pub fn force_trace() -> bool { + #[cfg(not(feature = "mock"))] + { + let fid = fiber_id(); + return TRACE_MANAGER.with(|tm| { + tm.borrow() + .get(fid) + .map_or(false, |ti| ti.tracer() == &QueryTracer::Global) + }); + } + false +} + #[allow(unused_variables)] pub fn inject_context(carrier: &mut dyn Injector) { #[cfg(not(feature = "mock"))] @@ -290,13 +415,18 @@ pub fn extract_context(carrier: &mut dyn Extractor) -> Context { pub fn extract_params<S: ::std::hash::BuildHasher>( context: Option<HashMap<String, String, S>>, id: Option<String>, + pattern: &str, + force_trace: bool, ) -> (String, Context, QueryTracer) { - let tracer = if let (None, None) = (&id, &context) { - QueryTracer::Noop + let tracer = match (force_trace, &id, &context) { + (_, None, None) | (false, _, _) => QueryTracer::Statistics, + _ => QueryTracer::Global, + }; + let id = if let Some(id) = id { + id } else { - QueryTracer::Global + query_id(pattern) }; - let id = if let Some(id) = id { id } else { new_id() }; let ctx = if let Some(mut carrier) = context { debug!( Option::from("parameters extraction"), diff --git a/sbroad-core/src/otm/statistics.rs b/sbroad-core/src/otm/statistics.rs new file mode 100644 index 0000000000000000000000000000000000000000..54e0a58c2fdd1ce918b5fd8f9ededb8cdb5ea879 --- /dev/null +++ b/sbroad-core/src/otm/statistics.rs @@ -0,0 +1,166 @@ +//! Statistics span processor. +//! +//! This span processor is used to collect statistics about running queries. +//! It uses sampling (1% at the moment) to reduce the overhead of collecting +//! statistics. The results are written to `__sbroad_query` and `__sbroad_stat` +//! spaces and evicted by LRU strategy (more details in the `table` and +//! `eviction` modules). +//! +//! The best way to inspect the statistics on any instance is to use local SQL. +//! For example, to get the top 5 most expensive SELECT SQL queries by the +//! average execution time: +//! ```sql +//! select distinct(q.query_text) from __sbroad_stat as s +//! join __sbroad_query as q +//! on s.query_id = q.query_id +//! where lower(q.query_text) like 'select%' +//! order by s.sum/s.count desc +//! limit 5 +//! ``` +//! +//! Or to get the flame graph of the most expensive query: +//! ```sql +//! with recursive st as ( +//! select * from __sbroad_stat where query_id in (select qt.query_id from qt) +//! and parent_span = '' +//! union all +//! select s.* from __sbroad_stat as s, st on s.parent_span = st.span +//! and s.query_id in (select qt.query_id from qt) +//! ), qt as ( +//! select s.query_id from __sbroad_stat as s +//! join __sbroad_query as q +//! on s.query_id = q.query_id +//! order by s.sum/s.count desc +//! limit 1 +//! ) +//! select * from st; +//! ``` + +use eviction::TRACKED_QUERIES; +use opentelemetry::sdk::export::trace::SpanData; +use opentelemetry::sdk::trace::{Span, SpanProcessor}; +use opentelemetry::trace::SpanId; +use opentelemetry::Context; +use std::time::Duration; +use table::{RustMap, SpanName, TarantoolSpace, QUERY, SPAN, STAT}; + +use crate::debug; + +pub mod eviction; +pub mod table; + +#[derive(Debug)] +pub struct StatCollector {} + +impl StatCollector { + #[must_use] + pub fn new() -> Self { + Self {} + } +} + +impl Default for StatCollector { + fn default() -> Self { + Self::new() + } +} + +impl SpanProcessor for StatCollector { + fn on_start(&self, span: &mut Span, _cx: &Context) { + let span_data = match span.exported_data() { + Some(span_data) => span_data, + None => return, + }; + + let id: String = match span_data.attributes.get(&"id".into()) { + Some(id) => id.to_string(), + None => return, + }; + + // We are processing a top level query span. Lets register it. + if let Some(query_sql) = span_data.attributes.get(&"query_sql".into()) { + QUERY.with(|query_space| { + let tuple = (id.to_string(), query_sql.to_string(), 2); + query_space.borrow_mut().upsert(tuple); + }); + debug!(Option::from("on start"), &format!("query: {}", query_sql)); + } + + // Register current span mapping (span id: span name). + SPAN.with(|span_table| { + let key = span_data.span_context.span_id(); + let value = SpanName::from(span_data.name.clone()); + debug!( + Option::from("on start"), + &format!("key: {:?}, value: {:?}", key, value) + ); + span_table.borrow_mut().push(key, value); + }); + } + + fn on_end(&self, span: SpanData) { + let id: String = match span.attributes.get(&"id".into()) { + Some(id) => id.to_string(), + None => return, + }; + + let parent_span: String = if span.parent_span_id == SpanId::INVALID { + "".to_string() + } else { + SPAN.with(|span_table| { + let span_table = span_table.borrow(); + span_table + .get(&span.parent_span_id) + .map_or_else(|| "".to_string(), |span_name| span_name.value().to_string()) + }) + }; + + let duration = match span.end_time.duration_since(span.start_time) { + Ok(duration) => duration, + // The clock may have gone backwards. + Err(_) => Duration::from_secs(0), + } + .as_secs_f64(); + // Update statistics. + STAT.with(|stat_space| { + let tuple = ( + id.to_string(), + String::from(span.name), + parent_span, + duration, + duration, + duration, + 1, + ); + stat_space.borrow_mut().upsert(tuple); + }); + + // Remove current span id to name mapping. + SPAN.with(|span_table| { + span_table.borrow_mut().pop(&span.span_context.span_id()); + }); + + // Unreference the query for the top level query span. + // We don't want to remove the query while some other + // fiber is still collecting statistics for it. + if span.attributes.get(&"query_sql".into()).is_some() { + QUERY.with(|query_space| { + query_space.borrow_mut().delete(&(id.to_string(),)); + }); + } + + // Evict old queries. + TRACKED_QUERIES.with(|tracked_queries| { + let mut tracked_queries = tracked_queries.borrow_mut(); + tracked_queries.push(id).unwrap(); + }); + } + + fn force_flush(&self) -> opentelemetry::trace::TraceResult<()> { + Ok(()) + } + + fn shutdown(&mut self) -> opentelemetry::trace::TraceResult<()> { + Ok(()) + } +} diff --git a/sbroad-core/src/otm/statistics/eviction.rs b/sbroad-core/src/otm/statistics/eviction.rs new file mode 100644 index 0000000000000000000000000000000000000000..bb1bfe76c9a83fd5659c2143e0d886565a12a079 --- /dev/null +++ b/sbroad-core/src/otm/statistics/eviction.rs @@ -0,0 +1,60 @@ +//! Statistics eviction module. +//! +//! This module is used to evict statistics from the `__sbroad_stat` +//! and `__sbroad_query` spaces. The eviction is performed by LRU +//! strategy at the threshold of 1000 entries in the `__sbroad_query` +//! space. + +use crate::debug; +use crate::errors::QueryPlannerError; +use crate::executor::lru::{Cache, LRUCache}; +use crate::otm::statistics::table::{TarantoolSpace, QUERY}; +use std::cell::RefCell; + +pub const STATISTICS_CAPACITY: usize = 1000; +thread_local!(pub(super) static TRACKED_QUERIES: RefCell<TrackedQueries> = RefCell::new(TrackedQueries::new())); + +pub struct TrackedQueries { + queries: LRUCache<String, String>, +} + +#[allow(clippy::unnecessary_wraps)] +fn remove_query(query_id: &mut String) -> Result<(), QueryPlannerError> { + QUERY.with(|query_space| { + let mut query_space = query_space.borrow_mut(); + debug!( + Option::from("tracked queries"), + &format!("remove query: {}", query_id) + ); + let key = std::mem::take(query_id); + query_space.delete(&(key,)); + }); + Ok(()) +} + +impl Default for TrackedQueries { + fn default() -> Self { + Self::new() + } +} + +impl TrackedQueries { + /// Create a new instance of `TrackedQueries`. + /// + /// # Panics + /// - If the `STATISTICS_CAPACITY` is less than 1 (impossible at the moment). + #[must_use] + pub fn new() -> Self { + Self { + queries: LRUCache::new(STATISTICS_CAPACITY, Some(Box::new(remove_query))).unwrap(), + } + } + + /// Add a new query to the tracked queries. + /// + /// # Errors + /// - Internal error in the eviction function. + pub fn push(&mut self, key: String) -> Result<(), QueryPlannerError> { + self.queries.put(key.clone(), key) + } +} diff --git a/sbroad-core/src/otm/statistics/table.rs b/sbroad-core/src/otm/statistics/table.rs new file mode 100644 index 0000000000000000000000000000000000000000..f2a1e6ed98ae1990509d11350c723bbf0e724f9b --- /dev/null +++ b/sbroad-core/src/otm/statistics/table.rs @@ -0,0 +1,605 @@ +//! Statistics table module. +//! +//! This module contains the implementation of the statistics spaces used by +//! the `statistics` span processor. There are two spaces and one hash table +//! used to store the statistics: +//! - `__sbroad_query` space - stores the queries that are currently being executed. +//! Its query id is used as a key for the `__sbroad_stat` space. +//! - `__sbroad_stat` space - stores the statistics for the query spans. The spans +//! are stored as a flat tree for each query. +//! - `SpanMap` hash table - stores the mapping between the span id and the span name. +//! The reason is that the span context contains only the span id, so we use +//! this table to save the span name when create the span (and remove it when +//! the span is finished). +//! +//! Keep in mind that the spaces are created in a "lazy" way, i.e. they are created +//! only when statistics processor tries to write to them. As we collect statistics +//! only for 1% of the queries, there is a chance that right after the instance start +//! the statistics spaces still would not exist. Though it is not a problem, such +//! situation can surprise the user. Also we should remember that read-only replicas +//! can not create spaces (even the temporary ones). As a result, the spaces can be +//! "lazily" created on the storages only when some of them are dispatched to the +//! master instance. + +use ahash::AHashMap; +use index::Part; +use opentelemetry::trace::SpanId; +use space::Field; +use std::borrow::Cow; +use std::cell::RefCell; +use tarantool::index::{IndexFieldType, IndexOptions, IndexType, IteratorType}; +use tarantool::space::{Space, SpaceCreateOptions, SpaceEngineType}; +use tarantool::{index, space}; + +use crate::{debug, warn}; + +thread_local!(pub(super) static QUERY: RefCell<QuerySpace> = RefCell::new(QuerySpace::new())); +thread_local!(pub(super) static SPAN: RefCell<SpanMap> = RefCell::new(SpanMap::new())); +thread_local!(pub(super) static STAT: RefCell<StatSpace> = RefCell::new(StatSpace::new())); + +pub trait RustMap { + type Key; + type Value; + + fn new() -> Self; + fn get(&self, key: &Self::Key) -> Option<&Self::Value>; + fn get_mut(&mut self, key: &Self::Key) -> Option<&mut Self::Value>; + fn push(&mut self, key: Self::Key, value: Self::Value); + fn pop(&mut self, key: &Self::Key) -> Option<Self::Value>; +} + +pub struct SpanMap(AHashMap<SpanId, SpanName>); + +impl RustMap for SpanMap { + type Key = SpanId; + type Value = SpanName; + + fn new() -> Self { + Self(AHashMap::new()) + } + + fn get(&self, key: &Self::Key) -> Option<&Self::Value> { + self.0.get(key) + } + + fn get_mut(&mut self, key: &Self::Key) -> Option<&mut Self::Value> { + self.0.get_mut(key) + } + + fn push(&mut self, key: Self::Key, value: Self::Value) { + self.0.insert(key, value); + } + + fn pop(&mut self, key: &Self::Key) -> Option<Self::Value> { + self.0.remove(key) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +pub struct SpanName(String); + +impl SpanName { + #[must_use] + pub fn value(&self) -> &str { + &self.0 + } +} + +impl From<&str> for SpanName { + fn from(s: &str) -> Self { + Self(s.to_string()) + } +} + +impl From<Cow<'static, str>> for SpanName { + fn from(s: Cow<'static, str>) -> Self { + Self(s.to_string()) + } +} + +pub trait TarantoolSpace { + type Key; + type Tuple; + + fn new() -> Self; + fn get(&mut self, key: &Self::Key) -> Option<Self::Tuple>; + fn upsert(&mut self, tuple: Self::Tuple); + fn delete(&mut self, key: &Self::Key) -> Option<Self::Tuple>; + + /// Space name. + fn name(&self) -> &str; + + /// Reference to the cached space. + fn space(&self) -> &Option<Space>; + + /// Mutable reference to the cached space. + fn space_mut(&mut self) -> &mut Option<Space>; + + /// The space can be created only on the master node of the + /// replica set. The read-only replicas can't create spaces + /// (DDL would be replicated from master some moment later). + fn try_create_space(&mut self); + + /// Update the cached space information. This method should + /// be called before any space operation. + fn space_update_cache(&mut self) { + let name = self.name().to_string(); + match self.space_mut() { + Some(space) => { + let space_id = space.id(); + // Find the space to validate the its id. + match Space::find(&name) { + Some(found_space) => { + // The space was recreated by user, need to update the cache. + if found_space.id() != space_id { + warn!( + Option::from("space"), + &format!( + "Space {} was found with different id: {} != {}", + &name, + space_id, + found_space.id() + ) + ); + *space = found_space; + } + } + None => { + // Someone has removed the space from the instance. + // Remove the cached space and try to create it again. + *self.space_mut() = None; + self.try_create_space(); + } + } + } + None => { + // Try to find the space. + if let Some(space) = Space::find(&name) { + *self.space_mut() = Some(space); + } else { + // Try to create the space. + self.try_create_space(); + } + } + } + } +} + +pub struct QuerySpace { + space: Option<Space>, + name: String, +} + +impl TarantoolSpace for QuerySpace { + type Key = (String,); + type Tuple = (String, String, u64); + + fn new() -> Self { + let mut space = Self { + space: None, + name: "__SBROAD_QUERY".to_string(), + }; + space.space_update_cache(); + space + } + + fn name(&self) -> &str { + &self.name + } + + fn space(&self) -> &Option<Space> { + &self.space + } + + fn space_mut(&mut self) -> &mut Option<Space> { + &mut self.space + } + + fn get(&mut self, key: &Self::Key) -> Option<Self::Tuple> { + self.space_update_cache(); + if let Some(space) = self.space() { + match space.select(IteratorType::Eq, key) { + Ok(mut iter) => { + if let Some(tuple) = iter.next() { + match tuple.decode::<Self::Tuple>() { + Ok(tuple) => return Some(tuple), + Err(_e) => { + warn!( + Option::from("space query"), + &format!("Failed to decode tuple: {}", _e) + ); + } + } + } + } + Err(_e) => { + warn!( + Option::from("space query"), + &format!("Space {} select error: {}", self.name(), _e) + ); + } + } + } + None + } + + fn upsert(&mut self, tuple: Self::Tuple) { + self.space_update_cache(); + if let Some(space) = self.space_mut() { + if let Err(_e) = space.upsert(&tuple, &[("+", 2, 1)]) { + warn!( + Option::from("space query"), + &format!("Space {} upsert error: {}", self.name(), _e) + ); + } else { + debug!( + Option::from("space query"), + &format!("increment ref_counter") + ); + } + } + } + + fn delete(&mut self, key: &Self::Key) -> Option<Self::Tuple> { + self.space_update_cache(); + if let Some(space) = self.space_mut() { + match space.select(IteratorType::Eq, key) { + Ok(mut iterator) => { + if let Some(tuple) = iterator.next() { + match tuple.decode::<Self::Tuple>() { + Ok((_, _, ref_counter)) => { + if ref_counter > 0 { + if let Err(_e) = space.upsert(&tuple, &[("-", 2, 1)]) { + warn!( + Option::from("space query"), + &format!("Space {} upsert error: {}", self.name(), _e) + ); + } else { + debug!( + Option::from("space query"), + &format!("decrement ref_counter") + ); + } + } else { + let result = match space.delete(key) { + Ok(tuple) => { + if let Some(tuple) = tuple { + match tuple.decode::<Self::Tuple>() { + Ok(tuple) => { + debug!( + Option::from("space query"), + &format!("delete tuple") + ); + Some(tuple) + } + Err(_e) => { + warn!( + Option::from("space query"), + &format!( + "Failed to decode tuple: {}", + _e + ) + ); + None + } + } + } else { + None + } + } + Err(_e) => { + warn!( + Option::from("space query"), + &format!( + "Space {} delete error: {}", + self.name(), + _e + ) + ); + None + } + }; + STAT.with(|stat| { + let query_id = key.0.as_str(); + stat.borrow_mut().delete_query(query_id); + }); + + return result; + } + } + Err(err) => { + let _msg = format!("Can't decode tuple: {}", err); + warn!(Option::from("space query"), &_msg); + } + } + } + } + Err(err) => { + let _msg = &format!("failed to select tuple: {}", err); + warn!(Option::from("space query"), &_msg); + } + } + } + None + } + + fn try_create_space(&mut self) { + let options = SpaceCreateOptions { + format: Some(vec![ + Field::string("QUERY_ID"), + Field::string("QUERY_TEXT"), + Field::unsigned("REF_COUNTER"), + ]), + engine: SpaceEngineType::Memtx, + is_temporary: true, + is_local: false, + if_not_exists: true, + ..Default::default() + }; + + let pk = IndexOptions { + r#type: Some(IndexType::Tree), + unique: Some(true), + parts: Some(vec![Part::new(1, IndexFieldType::String)]), + if_not_exists: Some(true), + ..Default::default() + }; + + match Space::create(&self.name, &options) { + Ok(table) => { + debug!(Option::from("space query"), "Space created"); + match table.create_index("__SBROAD_QUERY_PK", &pk) { + Ok(_) => { + debug!( + Option::from("space query"), + "Index __SBROAD_QUERY_PK created" + ); + let space = self.space_mut(); + *space = Some(table); + } + Err(err) => { + let _msg = &format!("failed to create index __SBROAD_QUERY_PK: {}", err); + warn!(Option::from("space query"), &_msg); + } + } + } + Err(_e) => { + warn!( + Option::from("space query"), + &format!("Failed to create a space: {}", _e) + ); + } + } + } +} + +pub struct StatSpace { + space: Option<Space>, + name: String, +} + +type StatSpaceTuple = (String, String, String, f64, f64, f64, u64); + +impl TarantoolSpace for StatSpace { + type Key = (String, String); + type Tuple = StatSpaceTuple; + + fn new() -> Self { + let mut space = Self { + space: None, + name: "__SBROAD_STAT".to_string(), + }; + space.space_update_cache(); + space + } + + fn name(&self) -> &str { + &self.name + } + + fn space(&self) -> &Option<Space> { + &self.space + } + + fn space_mut(&mut self) -> &mut Option<Space> { + &mut self.space + } + + fn try_create_space(&mut self) { + let options = SpaceCreateOptions { + format: Some(vec![ + Field::string("QUERY_ID"), + Field::string("SPAN"), + Field::string("PARENT_SPAN"), + Field::double("MIN"), + Field::double("MAX"), + Field::double("SUM"), + Field::unsigned("COUNT"), + ]), + engine: SpaceEngineType::Memtx, + is_temporary: true, + is_local: false, + if_not_exists: true, + ..Default::default() + }; + + let pk = IndexOptions { + r#type: Some(IndexType::Tree), + unique: Some(true), + parts: Some(vec![ + Part::new(1, IndexFieldType::String), + Part::new(2, IndexFieldType::String), + ]), + if_not_exists: Some(true), + ..Default::default() + }; + + match Space::create(&self.name, &options) { + Ok(table) => { + debug!(Option::from("space stat"), "Space created"); + match table.create_index("__SBROAD_STAT_PK", &pk) { + Ok(_) => { + debug!(Option::from("space stat"), "Index __SBROAD_STAT_PK created"); + let space = self.space_mut(); + *space = Some(table); + } + Err(err) => { + let _msg = &format!("failed to create index __SBROAD_STAT_PK: {}", err); + warn!(Option::from("space stat"), &_msg); + } + } + } + Err(_e) => { + warn!( + Option::from("space stat"), + &format!("Failed to create a space: {}", _e) + ); + } + } + } + + fn get(&mut self, key: &Self::Key) -> Option<Self::Tuple> { + self.space_update_cache(); + if let Some(space) = self.space() { + match space.select(IteratorType::Eq, key) { + Err(_e) => { + warn!( + Option::from("space stat"), + &format!("Space {} select error: {}", self.name(), _e) + ); + } + Ok(mut iter) => { + if let Some(tuple) = iter.next() { + match tuple.decode::<Self::Tuple>() { + Ok(tuple) => { + debug!(Option::from("space stat"), "get tuple"); + return Some(tuple); + } + Err(_e) => { + warn!( + Option::from("space stat"), + &format!("Failed to decode tuple: {}", _e) + ); + } + } + } + } + } + } + None + } + + fn upsert(&mut self, tuple: Self::Tuple) { + self.space_update_cache(); + match self.get(&(tuple.0.clone(), tuple.1.clone())) { + None => { + debug!(Option::from("space stat"), &format!("insert {:?}", tuple)); + if let Some(space) = self.space_mut() { + match space.insert(&tuple) { + Ok(_) => { + debug!(Option::from("space stat"), "inserted"); + } + Err(_e) => { + warn!( + Option::from("space stat"), + &format!("Space {} insert error: {}", self.name(), _e) + ); + } + } + } + } + Some(mut found_tuple) => { + debug!( + Option::from("space stat"), + &format!("update {:?} with {:?}", found_tuple, tuple) + ); + found_tuple.3 = tuple.3.min(found_tuple.3); + found_tuple.4 = tuple.4.max(found_tuple.4); + found_tuple.5 += tuple.5; + found_tuple.6 += tuple.6; + if let Some(space) = self.space_mut() { + match space.replace(&found_tuple) { + Ok(_) => { + debug!(Option::from("space stat"), "replaced"); + } + Err(_e) => { + warn!( + Option::from("space stat"), + &format!("Space {} update error: {}", self.name(), _e) + ); + } + } + } + } + } + } + + fn delete(&mut self, key: &Self::Key) -> Option<Self::Tuple> { + self.space_update_cache(); + if let Some(space) = self.space_mut() { + match space.delete(&key) { + Ok(tuple) => { + if let Some(tuple) = tuple { + match tuple.decode::<Self::Tuple>() { + Ok(tuple) => { + debug!(Option::from("space stat"), "deleted"); + return Some(tuple); + } + Err(_e) => { + warn!( + Option::from("space stat"), + &format!("Failed to decode tuple: {}", _e) + ); + } + } + } + } + Err(_e) => { + warn!( + Option::from("space stat"), + &format!("Space {} delete error: {}", self.name(), _e) + ); + } + } + } + None + } +} + +impl StatSpace { + fn delete_query(&mut self, query_id: &str) { + self.space_update_cache(); + let keys = if let Some(space) = self.space() { + let index_scan = match space.select(IteratorType::Eq, &(query_id,)) { + Err(_e) => { + warn!( + Option::from("space stat"), + &format!("Space {} select error: {}", self.name(), _e) + ); + return; + } + Ok(iter) => iter, + }; + index_scan + .filter_map(|tuple| match tuple.decode::<StatSpaceTuple>() { + Ok(tuple) => { + debug!(Option::from("space stat"), "get tuple"); + Some((tuple.0, tuple.1)) + } + Err(_e) => { + warn!( + Option::from("space stat"), + &format!("Failed to decode tuple: {}", _e) + ); + None + } + }) + .collect::<Vec<_>>() + } else { + return; + }; + for key in keys { + self.delete(&key); + } + } +}