Skip to content
Snippets Groups Projects
Verified Commit c6961c92 authored by Denis Smirnov's avatar Denis Smirnov
Browse files

feat!: refactor opentelemetry tracing

BREAKING CHANGE!: renamed __SBROAD_STAT into _sql_stat,
__SBROAD_QIERY into _sql_query.

1. _sql_stat primary key also includes the parent_span cause
   we support PG extended protocol and we need to bind same spans
   to different trees.
2. Fix some bugs in the telemetry for queries.
parent 0daadbe9
No related branches found
No related tags found
1 merge request!1414sbroad import
......@@ -115,7 +115,7 @@ pub extern "C" fn execute(f_ctx: FunctionCtx, args: FunctionArgs) -> c_int {
}
};
let id: String = required.id().into();
let id = required.trace_id().to_string();
let ctx = required.extract_context();
let tracer = required.tracer();
......
......@@ -510,12 +510,12 @@ end
local function assert_cache_hit(query_id)
local storage1 = cluster:server("storage-1-1").net_box
local r, err = storage1:call("box.execute", { [[
select span, query_id from "__SBROAD_STAT"
where span = '"tarantool.cache.hit.read.prepared"' and query_id = ? ]], { query_id } })
select "span", "query_id" from "_sql_stat"
where "span" = '"tarantool.cache.hit.read.prepared"' and "query_id" = ? ]], { query_id } })
t.assert_equals(err, nil)
t.assert_equals(r.metadata, {
{ name = "SPAN", type = "string" },
{ name = "QUERY_ID", type = "string" },
{ name = "span", type = "string" },
{ name = "query_id", type = "string" },
})
t.assert_equals(#r.rows, 1)
end
......@@ -523,13 +523,13 @@ end
local function assert_cache_miss(query_id)
local storage1 = cluster:server("storage-1-1").net_box
local r, err = storage1:call("box.execute", { [[
select span, query_id from "__SBROAD_STAT"
where span = '"tarantool.cache.miss.read.prepared"' and query_id = ?
select "span", "query_id" from "_sql_stat"
where "span" = '"tarantool.cache.miss.read.prepared"' and "query_id" = ?
]] , { query_id }})
t.assert_equals(err, nil)
t.assert_equals(r.metadata, {
{ name = "SPAN", type = "string" },
{ name = "QUERY_ID", type = "string" },
{ name = "span", type = "string" },
{ name = "query_id", type = "string" },
})
t.assert_equals(#r.rows, 1)
end
......
......@@ -91,14 +91,13 @@ pub fn is_sharding_column_name(name: &str) -> bool {
/// # Errors
/// - Failed to encode the execution plan.
pub fn encode_plan(mut exec_plan: ExecutionPlan) -> Result<(Binary, Binary), SbroadError> {
let query_type = exec_plan.query_type()?;
let can_be_cached = exec_plan.vtables_empty();
let query_type = exec_plan.query_type()?;
let (ordered, sub_plan_id) = match query_type {
QueryType::DQL => {
let top_id = exec_plan.get_ir_plan().get_top()?;
let sub_plan_id = if can_be_cached {
exec_plan
.get_ir_plan()
.pattern_id(exec_plan.get_ir_plan().get_top()?)?
exec_plan.get_ir_plan().pattern_id(top_id)?
} else {
// plan id is used as cache key on storages, no need to calculate it.
String::new()
......
......@@ -81,7 +81,7 @@ pub struct RequiredData {
pub can_be_cached: bool,
context: ContextCarrier,
tracer: QueryTracer,
trace_id: Option<String>,
trace_id: String,
pub options: Options,
pub schema_info: SchemaInfo,
}
......@@ -95,7 +95,7 @@ impl Default for RequiredData {
can_be_cached: true,
context: ContextCarrier::empty(),
tracer: QueryTracer::default(),
trace_id: None,
trace_id: String::new(),
options: Options::default(),
schema_info: SchemaInfo::default(),
}
......@@ -143,6 +143,7 @@ impl RequiredData {
let mut carrier = HashMap::new();
inject_context(&mut carrier);
let tracer = current_tracer();
let trace_id = current_id();
if carrier.is_empty() {
RequiredData {
plan_id,
......@@ -151,7 +152,7 @@ impl RequiredData {
can_be_cached,
context: ContextCarrier::empty(),
tracer,
trace_id: None,
trace_id,
options,
schema_info,
}
......@@ -163,24 +164,26 @@ impl RequiredData {
can_be_cached,
context: ContextCarrier::new(carrier),
tracer,
trace_id: Some(current_id()),
trace_id,
options,
schema_info,
}
}
}
#[must_use]
pub fn plan_id(&self) -> &str {
&self.plan_id
}
#[must_use]
pub fn tracer(&self) -> QueryTracer {
self.tracer.clone()
}
#[must_use]
pub fn id(&self) -> &str {
match &self.trace_id {
Some(trace_id) => trace_id,
None => &self.plan_id,
}
pub fn trace_id(&self) -> &str {
&self.trace_id
}
pub fn extract_context(&mut self) -> Context {
......
......@@ -43,6 +43,8 @@ use crate::errors::{Entity, SbroadError};
#[cfg(not(feature = "mock"))]
use prod_imports::*;
pub const OTM_CHAR_LIMIT: usize = 512;
static TRACER_NAME: &str = "libsbroad";
static RATIO: f64 = 0.01;
thread_local!(
......@@ -279,15 +281,19 @@ where
#[inline]
#[allow(unreachable_code)]
#[allow(unused_variables)]
pub fn stat_query_span<T, F>(name: &'static str, sql: &str, id: &str, f: F) -> T
pub fn stat_query_span<T, F>(name: &'static str, sql: &str, id: &str, traceable: bool, f: F) -> T
where
F: FnOnce() -> T,
{
#[cfg(not(feature = "mock"))]
{
let tracer = QueryTracer::Statistics;
let tracer = if traceable {
QueryTracer::TestStatistics
} else {
QueryTracer::Statistics
};
let ctx = Context::new();
return query_span(name, id, &tracer, &ctx, sql, f);
return query_span(name, &id, &tracer, &ctx, sql, f);
}
f()
}
......@@ -314,8 +320,7 @@ where
"query_sql",
sql.char_indices()
// Maximum number of bytes per a single name-value pair: 4096.
// UTF-8 character can be up to 4 bytes long, `query_sql` is 9 bytes long.
.filter_map(|(i, c)| if i <= 4083 { Some(c) } else { None })
.filter_map(|(i, c)| if i <= OTM_CHAR_LIMIT { Some(c) } else { None })
.collect::<String>(),
));
......
......@@ -2,7 +2,7 @@
//!
//! This span processor is used to collect statistics about running queries.
//! It uses sampling (1% at the moment) to reduce the overhead of collecting
//! statistics. The results are written to `__sbroad_query` and `__sbroad_stat`
//! statistics. The results are written to `_sql_query` and `_sql_stat`
//! spaces and evicted by LRU strategy (more details in the `table` and
//! `eviction` modules).
//!
......@@ -10,27 +10,27 @@
//! For example, to get the top 5 most expensive SELECT SQL queries by the
//! average execution time:
//! ```sql
//! select distinct(q.query_text) from __sbroad_stat as s
//! join __sbroad_query as q
//! on s.query_id = q.query_id
//! where lower(q.query_text) like 'select%'
//! order by s.sum/s.count desc
//! select distinct(q."query_text") from "_sql_stat" as s
//! join "_sql_query" as q
//! on s."query_id" = q."query_id"
//! where lower(q."query_text") like 'select%'
//! order by s."sum"/s."count" desc
//! limit 5
//! ```
//!
//! Or to get the flame graph of the most expensive query:
//! ```sql
//! with recursive st as (
//! select * from __sbroad_stat where query_id in (select qt.query_id from qt)
//! and parent_span = ''
//! select * from "_sql_stat" where "query_id" in (select qt."query_id" from qt)
//! and "parent_span" = ''
//! union all
//! select s.* from __sbroad_stat as s, st on s.parent_span = st.span
//! and s.query_id in (select qt.query_id from qt)
//! select s.* from "_sql_stat" as s, st on s."parent_span" = st."span"
//! and s."query_id" in (select qt."query_id" from qt)
//! ), qt as (
//! select s.query_id from __sbroad_stat as s
//! join __sbroad_query as q
//! on s.query_id = q.query_id
//! order by s.sum/s.count desc
//! select s."query_id" from "_sql_stat" as s
//! join "_sql_query" as q
//! on s."query_id" = q."query_id"
//! order by s."sum"/s."count" desc
//! limit 1
//! )
//! select * from st;
......@@ -79,7 +79,7 @@ impl SpanProcessor for StatCollector {
// We are processing a top level query span. Lets register it.
if let Some(query_sql) = span_data.attributes.get(&"query_sql".into()) {
QUERY.with(|query_space| {
let tuple = (id.to_string(), query_sql.to_string(), 2);
let tuple = (id, query_sql.to_string(), 2);
query_space.borrow_mut().upsert(tuple);
});
debug!(Option::from("on start"), &format!("query: {query_sql}"));
......
......@@ -3,9 +3,9 @@
//! This module contains the implementation of the statistics spaces used by
//! the `statistics` span processor. There are two spaces and one hash table
//! used to store the statistics:
//! - `__sbroad_query` space - stores the queries that are currently being executed.
//! Its query id is used as a key for the `__sbroad_stat` space.
//! - `__sbroad_stat` space - stores the statistics for the query spans. The spans
//! - `_sql_query` space - stores the queries that are currently being executed.
//! Its query id is used as a key for the `_sql_stat` space.
//! - `_sql_stat` space - stores the statistics for the query spans. The spans
//! are stored as a flat tree for each query.
//! - `SpanMap` hash table - stores the mapping between the span id and the span name.
//! The reason is that the span context contains only the span id, so we use
......@@ -174,7 +174,7 @@ impl TarantoolSpace for QuerySpace {
fn new() -> Self {
let mut space = Self {
space: None,
name: "__SBROAD_QUERY".to_string(),
name: "_sql_query".to_string(),
};
space.space_update_cache();
space
......@@ -323,9 +323,9 @@ impl TarantoolSpace for QuerySpace {
fn try_create_space(&mut self) {
let options = SpaceCreateOptions {
format: Some(vec![
Field::string("QUERY_ID"),
Field::string("QUERY_TEXT"),
Field::unsigned("REF_COUNTER"),
Field::string("query_id"),
Field::string("query_text"),
Field::unsigned("ref_counter"),
]),
engine: SpaceEngineType::Memtx,
space_type: SpaceType::DataTemporary,
......@@ -344,17 +344,14 @@ impl TarantoolSpace for QuerySpace {
match Space::create(&self.name, &options) {
Ok(table) => {
debug!(Option::from("space query"), "Space created");
match table.create_index("__SBROAD_QUERY_PK", &pk) {
match table.create_index("_sql_query_pk", &pk) {
Ok(_) => {
debug!(
Option::from("space query"),
"Index __SBROAD_QUERY_PK created"
);
debug!(Option::from("space query"), "Index _sql_query_pk created");
let space = self.space_mut();
*space = Some(table);
}
Err(err) => {
let _msg = &format!("failed to create index __SBROAD_QUERY_PK: {err}");
let _msg = &format!("failed to create index _sql_query_pk: {err}");
warn!(Option::from("space query"), &_msg);
}
}
......@@ -377,13 +374,13 @@ pub struct StatSpace {
type StatSpaceTuple = (String, String, String, f64, f64, f64, u64);
impl TarantoolSpace for StatSpace {
type Key = (String, String);
type Key = (String, String, String);
type Tuple = StatSpaceTuple;
fn new() -> Self {
let mut space = Self {
space: None,
name: "__SBROAD_STAT".to_string(),
name: "_sql_stat".to_string(),
};
space.space_update_cache();
space
......@@ -404,13 +401,13 @@ impl TarantoolSpace for StatSpace {
fn try_create_space(&mut self) {
let options = SpaceCreateOptions {
format: Some(vec![
Field::string("QUERY_ID"),
Field::string("SPAN"),
Field::string("PARENT_SPAN"),
Field::double("MIN"),
Field::double("MAX"),
Field::double("SUM"),
Field::unsigned("COUNT"),
Field::string("query_id"),
Field::string("span"),
Field::string("parent_span"),
Field::double("min"),
Field::double("max"),
Field::double("sum"),
Field::unsigned("count"),
]),
engine: SpaceEngineType::Memtx,
space_type: SpaceType::DataTemporary,
......@@ -424,6 +421,7 @@ impl TarantoolSpace for StatSpace {
parts: Some(vec![
Part::new(1, FieldType::String),
Part::new(2, FieldType::String),
Part::new(3, FieldType::String),
]),
if_not_exists: Some(true),
..Default::default()
......@@ -432,14 +430,14 @@ impl TarantoolSpace for StatSpace {
match Space::create(&self.name, &options) {
Ok(table) => {
debug!(Option::from("space stat"), "Space created");
match table.create_index("__SBROAD_STAT_PK", &pk) {
match table.create_index("_sql_stat_pk", &pk) {
Ok(_) => {
debug!(Option::from("space stat"), "Index __SBROAD_STAT_PK created");
debug!(Option::from("space stat"), "Index _sql_stat_pk created");
let space = self.space_mut();
*space = Some(table);
}
Err(err) => {
let _msg = &format!("failed to create index __SBROAD_STAT_PK: {err}");
let _msg = &format!("failed to create index _sql_stat_pk: {err}");
warn!(Option::from("space stat"), &_msg);
}
}
......@@ -486,7 +484,7 @@ impl TarantoolSpace for StatSpace {
fn upsert(&mut self, tuple: Self::Tuple) {
self.space_update_cache();
match self.get(&(tuple.0.clone(), tuple.1.clone())) {
match self.get(&(tuple.0.clone(), tuple.1.clone(), tuple.2.clone())) {
None => {
debug!(Option::from("space stat"), &format!("insert {:?}", tuple));
if let Some(space) = self.space_mut() {
......@@ -579,7 +577,7 @@ impl StatSpace {
.filter_map(|tuple| match tuple.decode::<StatSpaceTuple>() {
Ok(tuple) => {
debug!(Option::from("space stat"), "get tuple");
Some((tuple.0, tuple.1))
Some((tuple.0, tuple.1, tuple.2))
}
Err(_e) => {
warn!(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment