From 646ec5c8be320483e4bba8e52c8b591074b4951a Mon Sep 17 00:00:00 2001 From: Denis Smirnov <sd@picodata.io> Date: Fri, 9 Sep 2022 17:43:05 +0700 Subject: [PATCH] feat: make jaeger agent address configurable in sbroad --- src/executor/engine/cartridge.rs | 5 ++- src/executor/engine/cartridge/config.rs | 32 +++++++++++++++++++ src/executor/engine/cartridge/router.rs | 40 +++++++++++++++++++++++- src/executor/engine/cartridge/storage.rs | 36 ++++++++++++++++++++- src/router.lua | 20 ++++++++++++ src/storage.lua | 20 ++++++++++++ test_app/test/helper.lua | 2 ++ 7 files changed, 150 insertions(+), 5 deletions(-) diff --git a/src/executor/engine/cartridge.rs b/src/executor/engine/cartridge.rs index e74baac6c6..d8b6066a39 100644 --- a/src/executor/engine/cartridge.rs +++ b/src/executor/engine/cartridge.rs @@ -14,15 +14,14 @@ use opentelemetry::sdk::propagation::{TextMapCompositePropagator, TraceContextPr static SERVICE_NAME: &str = "sbroad"; /// Update the opentelemetry global trace provider and tracer. -/// Use `OTEL_EXPORTER_JAEGER_AGENT_HOST` and `OTEL_EXPORTER_JAEGER_AGENT_PORT` -/// environment variables to configure the Jaeger agent's endpoint. /// /// # Errors /// - failed to build OTM global trace provider -pub fn update_tracing() -> Result<(), QueryPlannerError> { +pub fn update_tracing(host: &str, port: u16) -> Result<(), QueryPlannerError> { let propagator = TextMapCompositePropagator::new(vec![Box::new(TraceContextPropagator::new())]); set_text_map_propagator(propagator); let provider = opentelemetry_jaeger::new_pipeline() + .with_agent_endpoint(&format!("{}:{}", host, port)) .with_service_name(SERVICE_NAME) .build_simple() .map_err(|e| { diff --git a/src/executor/engine/cartridge/config.rs b/src/executor/engine/cartridge/config.rs index a262c6ce8d..cd8616a4a2 100644 --- a/src/executor/engine/cartridge/config.rs +++ b/src/executor/engine/cartridge/config.rs @@ -29,6 +29,12 @@ pub struct RouterConfiguration { /// Sharding column names. sharding_column: String, + /// Jaeger agent host. + jaeger_agent_host: String, + + /// Jaeger agent port. + jaeger_agent_port: u16, + /// IR table segments from the cluster spaces tables: HashMap<String, Table>, } @@ -45,6 +51,8 @@ impl RouterConfiguration { RouterConfiguration { waiting_timeout: 360, cache_capacity: DEFAULT_CAPACITY, + jaeger_agent_host: "localhost".to_string(), + jaeger_agent_port: 6831, tables: HashMap::new(), sharding_column: String::new(), } @@ -201,6 +209,24 @@ impl RouterConfiguration { Ok(()) } + #[must_use] + pub fn get_jaeger_agent_host(&self) -> &str { + self.jaeger_agent_host.as_str() + } + + #[must_use] + pub fn get_jaeger_agent_port(&self) -> u16 { + self.jaeger_agent_port + } + + pub fn set_jaeger_agent_host(&mut self, host: String) { + self.jaeger_agent_host = host; + } + + pub fn set_jaeger_agent_port(&mut self, port: u16) { + self.jaeger_agent_port = port; + } + /// Setup response waiting timeout for executor pub fn set_waiting_timeout(&mut self, timeout: u64) { if timeout > 0 { @@ -274,6 +300,10 @@ pub struct StorageConfiguration { /// If a new statement is bigger doesn't fit into the cache, /// it would not be cached but executed directly. pub storage_size_bytes: usize, + /// Jaeger agent host + pub jaeger_agent_host: String, + /// Jaeger agent port + pub jaeger_agent_port: u16, } impl Default for StorageConfiguration { @@ -288,6 +318,8 @@ impl StorageConfiguration { StorageConfiguration { storage_capacity: 0, storage_size_bytes: 0, + jaeger_agent_host: "localhost".to_string(), + jaeger_agent_port: 6831, } } diff --git a/src/executor/engine/cartridge/router.rs b/src/executor/engine/cartridge/router.rs index cbcfbc61d6..66f17c82ea 100644 --- a/src/executor/engine/cartridge/router.rs +++ b/src/executor/engine/cartridge/router.rs @@ -59,6 +59,7 @@ impl Configuration for RouterRuntime { self.metadata.is_empty() } + #[allow(clippy::too_many_lines)] fn get_config(&self) -> Result<Option<Self::Configuration>, QueryPlannerError> { if self.is_config_empty() { let lua = tarantool::lua_state(); @@ -78,6 +79,38 @@ impl Configuration for RouterRuntime { } }; + let jaeger_agent_host: LuaFunction<_> = + lua.eval("return get_jaeger_agent_host;").unwrap(); + let jaeger_host: String = match jaeger_agent_host.call() { + Ok(res) => res, + Err(e) => { + say( + SayLevel::Error, + file!(), + line!().try_into().unwrap_or(0), + Option::from("getting jaeger agent host"), + &format!("{:?}", e), + ); + return Err(QueryPlannerError::LuaError(format!("Lua error: {:?}", e))); + } + }; + + let jaeger_agent_port: LuaFunction<_> = + lua.eval("return get_jaeger_agent_port;").unwrap(); + let jaeger_port: u16 = match jaeger_agent_port.call() { + Ok(res) => res, + Err(e) => { + say( + SayLevel::Error, + file!(), + line!().try_into().unwrap_or(0), + Option::from("getting jaeger agent port"), + &format!("{:?}", e), + ); + return Err(QueryPlannerError::LuaError(format!("Lua error: {:?}", e))); + } + }; + let waiting_timeout: LuaFunction<_> = lua.eval("return get_waiting_timeout;").unwrap(); let timeout: u64 = match waiting_timeout.call() { Ok(res) => res, @@ -133,12 +166,17 @@ impl Configuration for RouterRuntime { }; let mut metadata = RouterConfiguration::new(); + metadata.set_jaeger_agent_host(jaeger_host); + metadata.set_jaeger_agent_port(jaeger_port); metadata.set_waiting_timeout(timeout); metadata.set_cache_capacity(router_capacity); metadata.set_sharding_column(normalize_name_from_schema(column.as_str())); // We should always load the schema **after** setting the sharding column. metadata.load_schema(&schema)?; - update_tracing()?; + update_tracing( + metadata.get_jaeger_agent_host(), + metadata.get_jaeger_agent_port(), + )?; return Ok(Some(metadata)); } diff --git a/src/executor/engine/cartridge/storage.rs b/src/executor/engine/cartridge/storage.rs index e355a3fcd2..0f87c523d3 100644 --- a/src/executor/engine/cartridge/storage.rs +++ b/src/executor/engine/cartridge/storage.rs @@ -109,10 +109,44 @@ impl Configuration for StorageRuntime { let storage_size_bytes = usize::try_from(cache_size_bytes) .map_err(|e| QueryPlannerError::CustomError(format!("{}", e)))?; + let jaeger_agent_host: LuaFunction<_> = + lua.eval("return get_jaeger_agent_host;").unwrap(); + let jaeger_host: String = match jaeger_agent_host.call() { + Ok(res) => res, + Err(e) => { + say( + SayLevel::Error, + file!(), + line!().try_into().unwrap_or(0), + Option::from("getting jaeger agent host"), + &format!("{:?}", e), + ); + return Err(QueryPlannerError::LuaError(format!("Lua error: {:?}", e))); + } + }; + + let jaeger_agent_port: LuaFunction<_> = + lua.eval("return get_jaeger_agent_port;").unwrap(); + let jaeger_port: u16 = match jaeger_agent_port.call() { + Ok(res) => res, + Err(e) => { + say( + SayLevel::Error, + file!(), + line!().try_into().unwrap_or(0), + Option::from("getting jaeger agent port"), + &format!("{:?}", e), + ); + return Err(QueryPlannerError::LuaError(format!("Lua error: {:?}", e))); + } + }; + let mut metadata = StorageConfiguration::new(); metadata.storage_capacity = storage_capacity; metadata.storage_size_bytes = storage_size_bytes; - update_tracing()?; + metadata.jaeger_agent_host = jaeger_host; + metadata.jaeger_agent_port = jaeger_port; + update_tracing(&metadata.jaeger_agent_host, metadata.jaeger_agent_port)?; return Ok(Some(metadata)); } diff --git a/src/router.lua b/src/router.lua index 36c16746a7..bccdd5ffc9 100644 --- a/src/router.lua +++ b/src/router.lua @@ -8,6 +8,26 @@ _G.get_schema = function() return cartridge.get_schema() end +_G.get_jaeger_agent_host = function() + local cfg = cartridge.config_get_readonly() + + if cfg["jaeger_agent_host"] == nil then + return "localhost" + end + + return cfg["jaeger_agent_host"] +end + +_G.get_jaeger_agent_port = function() + local cfg = cartridge.config_get_readonly() + + if cfg["jaeger_agent_port"] == nil then + return 6831 + end + + return cfg["jaeger_agent_port"] +end + _G.get_waiting_timeout = function() local cfg = cartridge.config_get_readonly() diff --git a/src/storage.lua b/src/storage.lua index dd79b409a7..074cbec3a7 100644 --- a/src/storage.lua +++ b/src/storage.lua @@ -22,6 +22,26 @@ _G.get_storage_cache_size_bytes = function() return cfg["storage_cache_size_bytes"] end +_G.get_jaeger_agent_host = function() + local cfg = cartridge.config_get_readonly() + + if cfg["jaeger_agent_host"] == nil then + return "localhost" + end + + return cfg["jaeger_agent_host"] +end + +_G.get_jaeger_agent_port = function() + local cfg = cartridge.config_get_readonly() + + if cfg["jaeger_agent_port"] == nil then + return 6831 + end + + return cfg["jaeger_agent_port"] +end + _G.prepare = function(pattern) local prep, err = box.prepare(pattern) if err ~= nil then diff --git a/test_app/test/helper.lua b/test_app/test/helper.lua index 0ba871bc66..a75804a774 100644 --- a/test_app/test/helper.lua +++ b/test_app/test/helper.lua @@ -47,6 +47,8 @@ helper.cluster = cartridge_helpers.Cluster:new({ local config = { ["executor_waiting_timeout"] = 200, ["executor_sharding_column"] = "bucket_id", + ["jaeger_agent_host"] = "127.0.0.1", + ["jaeger_agent_port"] = 6831, ["router_cache_capacity"] = 50, ["storage_cache_capacity"] = 200, ["storage_cache_size_bytes"] = 204800, -- GitLab