diff --git a/src/executor/engine/cartridge.rs b/src/executor/engine/cartridge.rs index e74baac6c6a5594b08b8b0ce5dd7e459a03afa58..d8b6066a39e2503832e05c41a9f85fe65deba83f 100644 --- a/src/executor/engine/cartridge.rs +++ b/src/executor/engine/cartridge.rs @@ -14,15 +14,14 @@ use opentelemetry::sdk::propagation::{TextMapCompositePropagator, TraceContextPr static SERVICE_NAME: &str = "sbroad"; /// Update the opentelemetry global trace provider and tracer. -/// Use `OTEL_EXPORTER_JAEGER_AGENT_HOST` and `OTEL_EXPORTER_JAEGER_AGENT_PORT` -/// environment variables to configure the Jaeger agent's endpoint. /// /// # Errors /// - failed to build OTM global trace provider -pub fn update_tracing() -> Result<(), QueryPlannerError> { +pub fn update_tracing(host: &str, port: u16) -> Result<(), QueryPlannerError> { let propagator = TextMapCompositePropagator::new(vec![Box::new(TraceContextPropagator::new())]); set_text_map_propagator(propagator); let provider = opentelemetry_jaeger::new_pipeline() + .with_agent_endpoint(&format!("{}:{}", host, port)) .with_service_name(SERVICE_NAME) .build_simple() .map_err(|e| { diff --git a/src/executor/engine/cartridge/config.rs b/src/executor/engine/cartridge/config.rs index a262c6ce8d1fc6d2d387cb72b4610812573582a7..cd8616a4a214dfb3b8b06a98d6795bb46eb33c1e 100644 --- a/src/executor/engine/cartridge/config.rs +++ b/src/executor/engine/cartridge/config.rs @@ -29,6 +29,12 @@ pub struct RouterConfiguration { /// Sharding column names. sharding_column: String, + /// Jaeger agent host. + jaeger_agent_host: String, + + /// Jaeger agent port. + jaeger_agent_port: u16, + /// IR table segments from the cluster spaces tables: HashMap<String, Table>, } @@ -45,6 +51,8 @@ impl RouterConfiguration { RouterConfiguration { waiting_timeout: 360, cache_capacity: DEFAULT_CAPACITY, + jaeger_agent_host: "localhost".to_string(), + jaeger_agent_port: 6831, tables: HashMap::new(), sharding_column: String::new(), } @@ -201,6 +209,24 @@ impl RouterConfiguration { Ok(()) } + #[must_use] + pub fn get_jaeger_agent_host(&self) -> &str { + self.jaeger_agent_host.as_str() + } + + #[must_use] + pub fn get_jaeger_agent_port(&self) -> u16 { + self.jaeger_agent_port + } + + pub fn set_jaeger_agent_host(&mut self, host: String) { + self.jaeger_agent_host = host; + } + + pub fn set_jaeger_agent_port(&mut self, port: u16) { + self.jaeger_agent_port = port; + } + /// Setup response waiting timeout for executor pub fn set_waiting_timeout(&mut self, timeout: u64) { if timeout > 0 { @@ -274,6 +300,10 @@ pub struct StorageConfiguration { /// If a new statement is bigger doesn't fit into the cache, /// it would not be cached but executed directly. pub storage_size_bytes: usize, + /// Jaeger agent host + pub jaeger_agent_host: String, + /// Jaeger agent port + pub jaeger_agent_port: u16, } impl Default for StorageConfiguration { @@ -288,6 +318,8 @@ impl StorageConfiguration { StorageConfiguration { storage_capacity: 0, storage_size_bytes: 0, + jaeger_agent_host: "localhost".to_string(), + jaeger_agent_port: 6831, } } diff --git a/src/executor/engine/cartridge/router.rs b/src/executor/engine/cartridge/router.rs index cbcfbc61d6b23b22f2bf25880f128f8b7529a6fe..66f17c82ea324ea3068eead868bf24695c50e0fa 100644 --- a/src/executor/engine/cartridge/router.rs +++ b/src/executor/engine/cartridge/router.rs @@ -59,6 +59,7 @@ impl Configuration for RouterRuntime { self.metadata.is_empty() } + #[allow(clippy::too_many_lines)] fn get_config(&self) -> Result<Option<Self::Configuration>, QueryPlannerError> { if self.is_config_empty() { let lua = tarantool::lua_state(); @@ -78,6 +79,38 @@ impl Configuration for RouterRuntime { } }; + let jaeger_agent_host: LuaFunction<_> = + lua.eval("return get_jaeger_agent_host;").unwrap(); + let jaeger_host: String = match jaeger_agent_host.call() { + Ok(res) => res, + Err(e) => { + say( + SayLevel::Error, + file!(), + line!().try_into().unwrap_or(0), + Option::from("getting jaeger agent host"), + &format!("{:?}", e), + ); + return Err(QueryPlannerError::LuaError(format!("Lua error: {:?}", e))); + } + }; + + let jaeger_agent_port: LuaFunction<_> = + lua.eval("return get_jaeger_agent_port;").unwrap(); + let jaeger_port: u16 = match jaeger_agent_port.call() { + Ok(res) => res, + Err(e) => { + say( + SayLevel::Error, + file!(), + line!().try_into().unwrap_or(0), + Option::from("getting jaeger agent port"), + &format!("{:?}", e), + ); + return Err(QueryPlannerError::LuaError(format!("Lua error: {:?}", e))); + } + }; + let waiting_timeout: LuaFunction<_> = lua.eval("return get_waiting_timeout;").unwrap(); let timeout: u64 = match waiting_timeout.call() { Ok(res) => res, @@ -133,12 +166,17 @@ impl Configuration for RouterRuntime { }; let mut metadata = RouterConfiguration::new(); + metadata.set_jaeger_agent_host(jaeger_host); + metadata.set_jaeger_agent_port(jaeger_port); metadata.set_waiting_timeout(timeout); metadata.set_cache_capacity(router_capacity); metadata.set_sharding_column(normalize_name_from_schema(column.as_str())); // We should always load the schema **after** setting the sharding column. metadata.load_schema(&schema)?; - update_tracing()?; + update_tracing( + metadata.get_jaeger_agent_host(), + metadata.get_jaeger_agent_port(), + )?; return Ok(Some(metadata)); } diff --git a/src/executor/engine/cartridge/storage.rs b/src/executor/engine/cartridge/storage.rs index e355a3fcd26533f14b6f8a174277df947b1b0784..0f87c523d3d684a8eba1a8e1620d35249f9e6863 100644 --- a/src/executor/engine/cartridge/storage.rs +++ b/src/executor/engine/cartridge/storage.rs @@ -109,10 +109,44 @@ impl Configuration for StorageRuntime { let storage_size_bytes = usize::try_from(cache_size_bytes) .map_err(|e| QueryPlannerError::CustomError(format!("{}", e)))?; + let jaeger_agent_host: LuaFunction<_> = + lua.eval("return get_jaeger_agent_host;").unwrap(); + let jaeger_host: String = match jaeger_agent_host.call() { + Ok(res) => res, + Err(e) => { + say( + SayLevel::Error, + file!(), + line!().try_into().unwrap_or(0), + Option::from("getting jaeger agent host"), + &format!("{:?}", e), + ); + return Err(QueryPlannerError::LuaError(format!("Lua error: {:?}", e))); + } + }; + + let jaeger_agent_port: LuaFunction<_> = + lua.eval("return get_jaeger_agent_port;").unwrap(); + let jaeger_port: u16 = match jaeger_agent_port.call() { + Ok(res) => res, + Err(e) => { + say( + SayLevel::Error, + file!(), + line!().try_into().unwrap_or(0), + Option::from("getting jaeger agent port"), + &format!("{:?}", e), + ); + return Err(QueryPlannerError::LuaError(format!("Lua error: {:?}", e))); + } + }; + let mut metadata = StorageConfiguration::new(); metadata.storage_capacity = storage_capacity; metadata.storage_size_bytes = storage_size_bytes; - update_tracing()?; + metadata.jaeger_agent_host = jaeger_host; + metadata.jaeger_agent_port = jaeger_port; + update_tracing(&metadata.jaeger_agent_host, metadata.jaeger_agent_port)?; return Ok(Some(metadata)); } diff --git a/src/router.lua b/src/router.lua index 36c16746a78a3de067ceb5ae9d2a39898ef01b05..bccdd5ffc97e5aca959de0099be9aa7828d56057 100644 --- a/src/router.lua +++ b/src/router.lua @@ -8,6 +8,26 @@ _G.get_schema = function() return cartridge.get_schema() end +_G.get_jaeger_agent_host = function() + local cfg = cartridge.config_get_readonly() + + if cfg["jaeger_agent_host"] == nil then + return "localhost" + end + + return cfg["jaeger_agent_host"] +end + +_G.get_jaeger_agent_port = function() + local cfg = cartridge.config_get_readonly() + + if cfg["jaeger_agent_port"] == nil then + return 6831 + end + + return cfg["jaeger_agent_port"] +end + _G.get_waiting_timeout = function() local cfg = cartridge.config_get_readonly() diff --git a/src/storage.lua b/src/storage.lua index dd79b409a7f32b75268199fef4196931224b4f89..074cbec3a70689e2cc0835171c01ac4269068048 100644 --- a/src/storage.lua +++ b/src/storage.lua @@ -22,6 +22,26 @@ _G.get_storage_cache_size_bytes = function() return cfg["storage_cache_size_bytes"] end +_G.get_jaeger_agent_host = function() + local cfg = cartridge.config_get_readonly() + + if cfg["jaeger_agent_host"] == nil then + return "localhost" + end + + return cfg["jaeger_agent_host"] +end + +_G.get_jaeger_agent_port = function() + local cfg = cartridge.config_get_readonly() + + if cfg["jaeger_agent_port"] == nil then + return 6831 + end + + return cfg["jaeger_agent_port"] +end + _G.prepare = function(pattern) local prep, err = box.prepare(pattern) if err ~= nil then diff --git a/test_app/test/helper.lua b/test_app/test/helper.lua index 0ba871bc669da814c1824ae4ab7485e5c208cad4..a75804a774941547c540ab1b419badacd33769e5 100644 --- a/test_app/test/helper.lua +++ b/test_app/test/helper.lua @@ -47,6 +47,8 @@ helper.cluster = cartridge_helpers.Cluster:new({ local config = { ["executor_waiting_timeout"] = 200, ["executor_sharding_column"] = "bucket_id", + ["jaeger_agent_host"] = "127.0.0.1", + ["jaeger_agent_port"] = 6831, ["router_cache_capacity"] = 50, ["storage_cache_capacity"] = 200, ["storage_cache_size_bytes"] = 204800,