From c6961c92063b062c729ba80288136b8d9a804834 Mon Sep 17 00:00:00 2001
From: Denis Smirnov <sd@picodata.io>
Date: Thu, 19 Oct 2023 12:42:15 +0700
Subject: [PATCH] feat!: refactor opentelemetry tracing

BREAKING CHANGE!: renamed __SBROAD_STAT into _sql_stat,
__SBROAD_QIERY into _sql_query.

1. _sql_stat primary key also includes the parent_span cause
   we support PG extended protocol and we need to bind same spans
   to different trees.
2. Fix some bugs in the telemetry for queries.
---
 sbroad-cartridge/src/api/exec_query.rs        |  2 +-
 .../test_app/test/integration/insert_test.lua | 16 +++---
 sbroad-core/src/executor/engine/helpers.rs    |  7 ++-
 sbroad-core/src/executor/protocol.rs          | 21 ++++----
 sbroad-core/src/otm.rs                        | 15 ++++--
 sbroad-core/src/otm/statistics.rs             | 30 +++++------
 sbroad-core/src/otm/statistics/table.rs       | 52 +++++++++----------
 7 files changed, 74 insertions(+), 69 deletions(-)

diff --git a/sbroad-cartridge/src/api/exec_query.rs b/sbroad-cartridge/src/api/exec_query.rs
index c0e4d8f145..a24721d30a 100644
--- a/sbroad-cartridge/src/api/exec_query.rs
+++ b/sbroad-cartridge/src/api/exec_query.rs
@@ -115,7 +115,7 @@ pub extern "C" fn execute(f_ctx: FunctionCtx, args: FunctionArgs) -> c_int {
         }
     };
 
-    let id: String = required.id().into();
+    let id = required.trace_id().to_string();
     let ctx = required.extract_context();
     let tracer = required.tracer();
 
diff --git a/sbroad-cartridge/test_app/test/integration/insert_test.lua b/sbroad-cartridge/test_app/test/integration/insert_test.lua
index e757a07f1c..e0d080ca55 100644
--- a/sbroad-cartridge/test_app/test/integration/insert_test.lua
+++ b/sbroad-cartridge/test_app/test/integration/insert_test.lua
@@ -510,12 +510,12 @@ end
 local function assert_cache_hit(query_id)
     local storage1 = cluster:server("storage-1-1").net_box
     local r, err = storage1:call("box.execute", { [[
-    select span, query_id from "__SBROAD_STAT"
-    where span = '"tarantool.cache.hit.read.prepared"' and query_id = ? ]], { query_id } })
+    select "span", "query_id" from "_sql_stat"
+    where "span" = '"tarantool.cache.hit.read.prepared"' and "query_id" = ? ]], { query_id } })
     t.assert_equals(err, nil)
     t.assert_equals(r.metadata, {
-        { name = "SPAN", type = "string" },
-        { name = "QUERY_ID", type = "string" },
+        { name = "span", type = "string" },
+        { name = "query_id", type = "string" },
     })
     t.assert_equals(#r.rows, 1)
 end
@@ -523,13 +523,13 @@ end
 local function assert_cache_miss(query_id)
     local storage1 = cluster:server("storage-1-1").net_box
     local r, err = storage1:call("box.execute", { [[
-    select span, query_id from "__SBROAD_STAT"
-    where span = '"tarantool.cache.miss.read.prepared"' and query_id = ?
+    select "span", "query_id" from "_sql_stat"
+    where "span" = '"tarantool.cache.miss.read.prepared"' and "query_id" = ?
     ]] , { query_id }})
     t.assert_equals(err, nil)
     t.assert_equals(r.metadata, {
-        { name = "SPAN", type = "string" },
-        { name = "QUERY_ID", type = "string" },
+        { name = "span", type = "string" },
+        { name = "query_id", type = "string" },
     })
     t.assert_equals(#r.rows, 1)
 end
diff --git a/sbroad-core/src/executor/engine/helpers.rs b/sbroad-core/src/executor/engine/helpers.rs
index 4845dcc806..d34fa5ebd2 100644
--- a/sbroad-core/src/executor/engine/helpers.rs
+++ b/sbroad-core/src/executor/engine/helpers.rs
@@ -91,14 +91,13 @@ pub fn is_sharding_column_name(name: &str) -> bool {
 /// # Errors
 /// - Failed to encode the execution plan.
 pub fn encode_plan(mut exec_plan: ExecutionPlan) -> Result<(Binary, Binary), SbroadError> {
-    let query_type = exec_plan.query_type()?;
     let can_be_cached = exec_plan.vtables_empty();
+    let query_type = exec_plan.query_type()?;
     let (ordered, sub_plan_id) = match query_type {
         QueryType::DQL => {
+            let top_id = exec_plan.get_ir_plan().get_top()?;
             let sub_plan_id = if can_be_cached {
-                exec_plan
-                    .get_ir_plan()
-                    .pattern_id(exec_plan.get_ir_plan().get_top()?)?
+                exec_plan.get_ir_plan().pattern_id(top_id)?
             } else {
                 // plan id is used as cache key on storages, no need to calculate it.
                 String::new()
diff --git a/sbroad-core/src/executor/protocol.rs b/sbroad-core/src/executor/protocol.rs
index 01d1f81da0..da6ba3e581 100644
--- a/sbroad-core/src/executor/protocol.rs
+++ b/sbroad-core/src/executor/protocol.rs
@@ -81,7 +81,7 @@ pub struct RequiredData {
     pub can_be_cached: bool,
     context: ContextCarrier,
     tracer: QueryTracer,
-    trace_id: Option<String>,
+    trace_id: String,
     pub options: Options,
     pub schema_info: SchemaInfo,
 }
@@ -95,7 +95,7 @@ impl Default for RequiredData {
             can_be_cached: true,
             context: ContextCarrier::empty(),
             tracer: QueryTracer::default(),
-            trace_id: None,
+            trace_id: String::new(),
             options: Options::default(),
             schema_info: SchemaInfo::default(),
         }
@@ -143,6 +143,7 @@ impl RequiredData {
         let mut carrier = HashMap::new();
         inject_context(&mut carrier);
         let tracer = current_tracer();
+        let trace_id = current_id();
         if carrier.is_empty() {
             RequiredData {
                 plan_id,
@@ -151,7 +152,7 @@ impl RequiredData {
                 can_be_cached,
                 context: ContextCarrier::empty(),
                 tracer,
-                trace_id: None,
+                trace_id,
                 options,
                 schema_info,
             }
@@ -163,24 +164,26 @@ impl RequiredData {
                 can_be_cached,
                 context: ContextCarrier::new(carrier),
                 tracer,
-                trace_id: Some(current_id()),
+                trace_id,
                 options,
                 schema_info,
             }
         }
     }
 
+    #[must_use]
+    pub fn plan_id(&self) -> &str {
+        &self.plan_id
+    }
+
     #[must_use]
     pub fn tracer(&self) -> QueryTracer {
         self.tracer.clone()
     }
 
     #[must_use]
-    pub fn id(&self) -> &str {
-        match &self.trace_id {
-            Some(trace_id) => trace_id,
-            None => &self.plan_id,
-        }
+    pub fn trace_id(&self) -> &str {
+        &self.trace_id
     }
 
     pub fn extract_context(&mut self) -> Context {
diff --git a/sbroad-core/src/otm.rs b/sbroad-core/src/otm.rs
index 125809f502..6616ae341a 100644
--- a/sbroad-core/src/otm.rs
+++ b/sbroad-core/src/otm.rs
@@ -43,6 +43,8 @@ use crate::errors::{Entity, SbroadError};
 #[cfg(not(feature = "mock"))]
 use prod_imports::*;
 
+pub const OTM_CHAR_LIMIT: usize = 512;
+
 static TRACER_NAME: &str = "libsbroad";
 static RATIO: f64 = 0.01;
 thread_local!(
@@ -279,15 +281,19 @@ where
 #[inline]
 #[allow(unreachable_code)]
 #[allow(unused_variables)]
-pub fn stat_query_span<T, F>(name: &'static str, sql: &str, id: &str, f: F) -> T
+pub fn stat_query_span<T, F>(name: &'static str, sql: &str, id: &str, traceable: bool, f: F) -> T
 where
     F: FnOnce() -> T,
 {
     #[cfg(not(feature = "mock"))]
     {
-        let tracer = QueryTracer::Statistics;
+        let tracer = if traceable {
+            QueryTracer::TestStatistics
+        } else {
+            QueryTracer::Statistics
+        };
         let ctx = Context::new();
-        return query_span(name, id, &tracer, &ctx, sql, f);
+        return query_span(name, &id, &tracer, &ctx, sql, f);
     }
     f()
 }
@@ -314,8 +320,7 @@ where
             "query_sql",
             sql.char_indices()
                 // Maximum number of bytes per a single name-value pair: 4096.
-                // UTF-8 character can be up to 4 bytes long, `query_sql` is 9 bytes long.
-                .filter_map(|(i, c)| if i <= 4083 { Some(c) } else { None })
+                .filter_map(|(i, c)| if i <= OTM_CHAR_LIMIT { Some(c) } else { None })
                 .collect::<String>(),
         ));
 
diff --git a/sbroad-core/src/otm/statistics.rs b/sbroad-core/src/otm/statistics.rs
index 599cafa960..51ff0e1b4c 100644
--- a/sbroad-core/src/otm/statistics.rs
+++ b/sbroad-core/src/otm/statistics.rs
@@ -2,7 +2,7 @@
 //!
 //! This span processor is used to collect statistics about running queries.
 //! It uses sampling (1% at the moment) to reduce the overhead of collecting
-//! statistics. The results are written to `__sbroad_query` and `__sbroad_stat`
+//! statistics. The results are written to `_sql_query` and `_sql_stat`
 //! spaces and evicted by LRU strategy (more details in the `table` and
 //! `eviction` modules).
 //!
@@ -10,27 +10,27 @@
 //! For example, to get the top 5 most expensive SELECT SQL queries by the
 //! average execution time:
 //! ```sql
-//! select distinct(q.query_text) from __sbroad_stat as s
-//! join __sbroad_query as q
-//!     on s.query_id = q.query_id
-//! where lower(q.query_text) like 'select%'
-//! order by s.sum/s.count desc
+//! select distinct(q."query_text") from "_sql_stat" as s
+//! join "_sql_query" as q
+//!     on s."query_id" = q."query_id"
+//! where lower(q."query_text") like 'select%'
+//! order by s."sum"/s."count" desc
 //! limit 5
 //! ```
 //!
 //! Or to get the flame graph of the most expensive query:
 //! ```sql
 //! with recursive st as (
-//!     select * from __sbroad_stat where query_id in (select qt.query_id from qt)
-//!         and parent_span = ''
+//!     select * from "_sql_stat" where "query_id" in (select qt."query_id" from qt)
+//!         and "parent_span" = ''
 //!     union all
-//!     select s.* from __sbroad_stat as s, st on s.parent_span = st.span
-//!         and s.query_id in (select qt.query_id from qt)
+//!     select s.* from "_sql_stat" as s, st on s."parent_span" = st."span"
+//!         and s."query_id" in (select qt."query_id" from qt)
 //! ), qt as (
-//!     select s.query_id from __sbroad_stat as s
-//!     join __sbroad_query as q
-//!         on s.query_id = q.query_id
-//!     order by s.sum/s.count desc
+//!     select s."query_id" from "_sql_stat" as s
+//!     join "_sql_query" as q
+//!         on s."query_id" = q."query_id"
+//!     order by s."sum"/s."count" desc
 //!     limit 1
 //! )
 //! select * from st;
@@ -79,7 +79,7 @@ impl SpanProcessor for StatCollector {
         // We are processing a top level query span. Lets register it.
         if let Some(query_sql) = span_data.attributes.get(&"query_sql".into()) {
             QUERY.with(|query_space| {
-                let tuple = (id.to_string(), query_sql.to_string(), 2);
+                let tuple = (id, query_sql.to_string(), 2);
                 query_space.borrow_mut().upsert(tuple);
             });
             debug!(Option::from("on start"), &format!("query: {query_sql}"));
diff --git a/sbroad-core/src/otm/statistics/table.rs b/sbroad-core/src/otm/statistics/table.rs
index 2bfa9beb99..e06aab46e0 100644
--- a/sbroad-core/src/otm/statistics/table.rs
+++ b/sbroad-core/src/otm/statistics/table.rs
@@ -3,9 +3,9 @@
 //! This module contains the implementation of the statistics spaces used by
 //! the `statistics` span processor. There are two spaces and one hash table
 //! used to store the statistics:
-//! - `__sbroad_query` space - stores the queries that are currently being executed.
-//!   Its query id is used as a key for the `__sbroad_stat` space.
-//! - `__sbroad_stat` space - stores the statistics for the query spans. The spans
+//! - `_sql_query` space - stores the queries that are currently being executed.
+//!   Its query id is used as a key for the `_sql_stat` space.
+//! - `_sql_stat` space - stores the statistics for the query spans. The spans
 //!   are stored as a flat tree for each query.
 //! - `SpanMap` hash table - stores the mapping between the span id and the span name.
 //!   The reason is that the span context contains only the span id, so we use
@@ -174,7 +174,7 @@ impl TarantoolSpace for QuerySpace {
     fn new() -> Self {
         let mut space = Self {
             space: None,
-            name: "__SBROAD_QUERY".to_string(),
+            name: "_sql_query".to_string(),
         };
         space.space_update_cache();
         space
@@ -323,9 +323,9 @@ impl TarantoolSpace for QuerySpace {
     fn try_create_space(&mut self) {
         let options = SpaceCreateOptions {
             format: Some(vec![
-                Field::string("QUERY_ID"),
-                Field::string("QUERY_TEXT"),
-                Field::unsigned("REF_COUNTER"),
+                Field::string("query_id"),
+                Field::string("query_text"),
+                Field::unsigned("ref_counter"),
             ]),
             engine: SpaceEngineType::Memtx,
             space_type: SpaceType::DataTemporary,
@@ -344,17 +344,14 @@ impl TarantoolSpace for QuerySpace {
         match Space::create(&self.name, &options) {
             Ok(table) => {
                 debug!(Option::from("space query"), "Space created");
-                match table.create_index("__SBROAD_QUERY_PK", &pk) {
+                match table.create_index("_sql_query_pk", &pk) {
                     Ok(_) => {
-                        debug!(
-                            Option::from("space query"),
-                            "Index __SBROAD_QUERY_PK created"
-                        );
+                        debug!(Option::from("space query"), "Index _sql_query_pk created");
                         let space = self.space_mut();
                         *space = Some(table);
                     }
                     Err(err) => {
-                        let _msg = &format!("failed to create index __SBROAD_QUERY_PK: {err}");
+                        let _msg = &format!("failed to create index _sql_query_pk: {err}");
                         warn!(Option::from("space query"), &_msg);
                     }
                 }
@@ -377,13 +374,13 @@ pub struct StatSpace {
 type StatSpaceTuple = (String, String, String, f64, f64, f64, u64);
 
 impl TarantoolSpace for StatSpace {
-    type Key = (String, String);
+    type Key = (String, String, String);
     type Tuple = StatSpaceTuple;
 
     fn new() -> Self {
         let mut space = Self {
             space: None,
-            name: "__SBROAD_STAT".to_string(),
+            name: "_sql_stat".to_string(),
         };
         space.space_update_cache();
         space
@@ -404,13 +401,13 @@ impl TarantoolSpace for StatSpace {
     fn try_create_space(&mut self) {
         let options = SpaceCreateOptions {
             format: Some(vec![
-                Field::string("QUERY_ID"),
-                Field::string("SPAN"),
-                Field::string("PARENT_SPAN"),
-                Field::double("MIN"),
-                Field::double("MAX"),
-                Field::double("SUM"),
-                Field::unsigned("COUNT"),
+                Field::string("query_id"),
+                Field::string("span"),
+                Field::string("parent_span"),
+                Field::double("min"),
+                Field::double("max"),
+                Field::double("sum"),
+                Field::unsigned("count"),
             ]),
             engine: SpaceEngineType::Memtx,
             space_type: SpaceType::DataTemporary,
@@ -424,6 +421,7 @@ impl TarantoolSpace for StatSpace {
             parts: Some(vec![
                 Part::new(1, FieldType::String),
                 Part::new(2, FieldType::String),
+                Part::new(3, FieldType::String),
             ]),
             if_not_exists: Some(true),
             ..Default::default()
@@ -432,14 +430,14 @@ impl TarantoolSpace for StatSpace {
         match Space::create(&self.name, &options) {
             Ok(table) => {
                 debug!(Option::from("space stat"), "Space created");
-                match table.create_index("__SBROAD_STAT_PK", &pk) {
+                match table.create_index("_sql_stat_pk", &pk) {
                     Ok(_) => {
-                        debug!(Option::from("space stat"), "Index __SBROAD_STAT_PK created");
+                        debug!(Option::from("space stat"), "Index _sql_stat_pk created");
                         let space = self.space_mut();
                         *space = Some(table);
                     }
                     Err(err) => {
-                        let _msg = &format!("failed to create index __SBROAD_STAT_PK: {err}");
+                        let _msg = &format!("failed to create index _sql_stat_pk: {err}");
                         warn!(Option::from("space stat"), &_msg);
                     }
                 }
@@ -486,7 +484,7 @@ impl TarantoolSpace for StatSpace {
 
     fn upsert(&mut self, tuple: Self::Tuple) {
         self.space_update_cache();
-        match self.get(&(tuple.0.clone(), tuple.1.clone())) {
+        match self.get(&(tuple.0.clone(), tuple.1.clone(), tuple.2.clone())) {
             None => {
                 debug!(Option::from("space stat"), &format!("insert {:?}", tuple));
                 if let Some(space) = self.space_mut() {
@@ -579,7 +577,7 @@ impl StatSpace {
                 .filter_map(|tuple| match tuple.decode::<StatSpaceTuple>() {
                     Ok(tuple) => {
                         debug!(Option::from("space stat"), "get tuple");
-                        Some((tuple.0, tuple.1))
+                        Some((tuple.0, tuple.1, tuple.2))
                     }
                     Err(_e) => {
                         warn!(
-- 
GitLab