Skip to content
Snippets Groups Projects
Commit f3bcf2ce authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor: move plugin metrics storage into plugin_manager

parent aefe99ae
No related branches found
No related tags found
No related merge requests found
......@@ -292,7 +292,6 @@ fn start_webui() {
/// (discovery, rpc, public proc api).
fn init_handlers() {
plugin::rpc::server::init_handlers();
plugin::metrics::init_handlers();
rpc::init_static_proc_set();
......
......@@ -19,6 +19,7 @@ use abi_stable::derive_macro_reexports::{RErr, RResult, RSlice};
use abi_stable::std_types::RStr;
use picodata_plugin::background::{Error, InternalGlobalWorkerManager};
use picodata_plugin::error_code::ErrorCode::PluginError as PluginErrorCode;
use picodata_plugin::metrics::FfiMetricsHandler;
use picodata_plugin::plugin::interface::FnServiceRegistrar;
use picodata_plugin::plugin::interface::ServiceId;
use picodata_plugin::plugin::interface::{PicoContext, ServiceRegistry};
......@@ -88,6 +89,8 @@ impl PluginState {
}
}
type MetricsHandlerMap = HashMap<ServiceId, Rc<FfiMetricsHandler>>;
pub struct PluginManager {
/// List of pairs (plugin name -> plugin state).
///
......@@ -100,8 +103,14 @@ pub struct PluginManager {
/// Queue of async events.
events_queue: Option<fiber::channel::Channel<PluginAsyncEvent>>,
/// Plugin-defined metrics callbacks.
pub(crate) metrics_handlers: fiber::Mutex<MetricsHandlerMap>,
/// Fiber for handle async events, those handlers need it for avoided yield's.
_loop: Option<fiber::JoinHandle<'static, ()>>,
/// Id is only stored for debugging.
#[allow(dead_code)]
async_event_fiber_id: fiber::FiberId,
}
impl PluginManager {
......@@ -114,18 +123,19 @@ impl PluginManager {
let options = WorkerOptions::default();
let pool = ConnectionPool::new(storage, options);
let r#loop = Loop::new(plugins.clone());
let defer_events_fiber = fiber::Builder::new()
let async_event_fiber_id = fiber::Builder::new()
.name("plugin_manager_loop")
.func(move || r#loop.run(tx))
.defer()
.expect("Plugin manager fiber should not fail");
.func(move || plugin_manager_async_event_loop(tx))
.defer_non_joinable()
.expect("Plugin manager fiber should not fail")
.expect("fiber id is supported");
Self {
plugins,
pool,
events_queue: Some(rx),
_loop: defer_events_fiber.into(),
metrics_handlers: Default::default(),
async_event_fiber_id,
}
}
......@@ -366,23 +376,41 @@ impl PluginManager {
Ok(())
}
fn handle_async_event(&self, event: PluginAsyncEvent) -> traft::Result<()> {
match event {
PluginAsyncEvent::ServiceConfigurationUpdated {
ident,
service,
old_raw,
new_raw,
} => {
let res = self.handle_config_updated(&ident, &service, &old_raw, &new_raw);
if let Err(e) = res {
tlog!(Error, "plugin {ident} service {service}, apply new plugin configuration error: {e}");
}
}
PluginAsyncEvent::PluginDisabled { name } => {
let res = self.handle_plugin_disabled(&name);
if let Err(e) = res {
tlog!(Error, "plugin {name} remove error: {e}");
}
}
}
Ok(())
}
/// Call `on_stop` callback at services and remove plugin from managed.
fn handle_plugin_disabled(
plugins: Rc<fiber::Mutex<HashMap<String, PluginState>>>,
plugin_name: &str,
) -> traft::Result<()> {
fn handle_plugin_disabled(&self, plugin_name: &str) -> traft::Result<()> {
let node = node::global().expect("node must be already initialized");
let ctx = context_from_node(node);
let plugin = {
let mut lock = plugins.lock();
lock.remove(plugin_name)
};
let plugin = self.plugins.lock().remove(plugin_name);
if let Some(plugin_state) = plugin {
// stop all background jobs and remove metrics first
stop_background_jobs(&plugin_state.services);
remove_metrics(&plugin_state.services);
self.remove_metrics_handlers(&plugin_state.services);
for service in plugin_state.services.iter() {
stop_service(service, &ctx);
......@@ -402,7 +430,7 @@ impl PluginManager {
// stop all background jobs and remove metrics first
stop_background_jobs(services_to_stop.clone());
remove_metrics(services_to_stop.clone());
self.remove_metrics_handlers(services_to_stop.clone());
for service in services_to_stop {
stop_service(service, &ctx);
......@@ -411,7 +439,7 @@ impl PluginManager {
/// Call `on_config_change` at services. Poison services if error at callbacks happens.
fn handle_config_updated(
plugins: Rc<fiber::Mutex<HashMap<String, PluginState>>>,
&self,
plugin_identity: &PluginIdentifier,
service: &str,
old_cfg_raw: &[u8],
......@@ -422,7 +450,7 @@ impl PluginManager {
let storage = &node.storage;
let maybe_service = {
let lock = plugins.lock();
let lock = self.plugins.lock();
lock.get(&plugin_identity.name)
.and_then(|plugin_state| {
plugin_state
......@@ -659,7 +687,7 @@ impl PluginManager {
// stop all background jobs and remove metrics first
stop_background_jobs(&[service_to_del.clone()]);
remove_metrics(&[service_to_del.clone()]);
self.remove_metrics_handlers(&[service_to_del.clone()]);
// call `on_stop` callback and drop service
stop_service(&service_to_del, &ctx);
......@@ -731,7 +759,7 @@ impl PluginManager {
if let Some(plugin_state) = maybe_plugin_state {
// stop all background jobs and remove metrics first
stop_background_jobs(&plugin_state.services);
remove_metrics(&plugin_state.services);
self.remove_metrics_handlers(&plugin_state.services);
for service in plugin_state.services.iter() {
stop_service(&service, &ctx);
......@@ -874,7 +902,7 @@ impl PluginManager {
/// # Arguments
///
/// * `event`: queued event
pub fn handle_event_async(&self, event: PluginAsyncEvent) -> Result<()> {
pub fn add_async_event_to_queue(&self, event: PluginAsyncEvent) -> Result<()> {
self.events_queue
.as_ref()
.expect("infallible")
......@@ -882,6 +910,20 @@ impl PluginManager {
.map_err(|_| PluginError::AsyncEventQueueFull)
}
fn remove_metrics_handlers<'a>(
&self,
services: impl IntoIterator<Item = &'a Rc<ServiceState>>,
) {
let mut handlers = self.metrics_handlers.lock();
for service in services {
let id = &service.id;
let handler = handlers.remove(id);
if handler.is_some() {
tlog!(Info, "unregistered metrics handler for `{id}`");
}
}
}
pub fn get_service_state(&self, id: &ServiceId) -> Option<Rc<ServiceState>> {
let plugins = self.plugins.lock();
let plugin = plugins.get(id.plugin())?;
......@@ -899,10 +941,6 @@ impl Drop for PluginManager {
fn drop(&mut self) {
let event_queue = self.events_queue.take();
event_queue.unwrap().close();
if let Some(r#loop) = self._loop.take() {
r#loop.join();
}
}
}
......@@ -937,50 +975,12 @@ fn stop_service(service: &ServiceState, context: &PicoContext) {
}
/// Plugin manager inner loop, using for handle async events (must be run in a separate fiber).
struct Loop {
/// List of pairs (plugin name -> plugin state)
///
/// There are two mutex here to avoid the situation when one long service callback
/// will block other services.
plugins: Rc<fiber::Mutex<HashMap<String, PluginState>>>,
}
impl Loop {
pub fn new(plugins: Rc<fiber::Mutex<HashMap<String, PluginState>>>) -> Self {
Self { plugins }
}
fn handle_event(&self, event: PluginAsyncEvent) -> traft::Result<()> {
match event {
PluginAsyncEvent::ServiceConfigurationUpdated {
ident,
service,
old_raw,
new_raw,
} => {
let plugins = self.plugins.clone();
if let Err(e) = PluginManager::handle_config_updated(
plugins, &ident, &service, &old_raw, &new_raw,
) {
tlog!(Error, "plugin {ident} service {service}, apply new plugin configuration error: {e}");
}
}
PluginAsyncEvent::PluginDisabled { name } => {
let plugins = self.plugins.clone();
if let Err(e) = PluginManager::handle_plugin_disabled(plugins, &name) {
tlog!(Error, "plugin {name} remove error: {e}");
}
}
}
Ok(())
}
fn run(&self, event_chan: fiber::channel::Channel<PluginAsyncEvent>) {
while let Some(event) = event_chan.recv() {
if let Err(e) = self.handle_event(event) {
tlog!(Error, "plugin async event handler error: {e}");
}
fn plugin_manager_async_event_loop(event_chan: fiber::channel::Channel<PluginAsyncEvent>) {
while let Some(event) = event_chan.recv() {
let node =
node::global().expect("initialized by the time plugin async events start happening");
if let Err(e) = node.plugin_manager.handle_async_event(event) {
tlog!(Error, "plugin async event handler error: {e}");
}
}
}
......@@ -1011,9 +1011,3 @@ fn stop_background_jobs<'a>(services: impl IntoIterator<Item = &'a Rc<ServiceSta
fiber.join();
}
}
fn remove_metrics<'a>(plugins: impl IntoIterator<Item = &'a Rc<ServiceState>>) {
for service in plugins {
crate::plugin::metrics::unregister_metrics_handler(&service.id)
}
}
use crate::tlog;
use crate::traft::node;
use picodata_plugin::metrics::FfiMetricsHandler;
use picodata_plugin::plugin::interface::ServiceId;
use picodata_plugin::util::RegionGuard;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::rc::Rc;
use tarantool::error::BoxError;
use tarantool::error::TarantoolErrorCode;
use tarantool::fiber::mutex::MutexGuard;
use tarantool::fiber::Mutex;
static mut HANDLERS: Option<Mutex<MetricsHandlerMap>> = None;
type MetricsHandlerMap = HashMap<ServiceId, Rc<FfiMetricsHandler>>;
pub(crate) fn init_handlers() {
unsafe {
HANDLERS = Some(Mutex::new(HashMap::new()));
}
}
#[inline]
fn handlers() -> MutexGuard<'static, MetricsHandlerMap> {
// SAFETY: global variable access: safe in tx thread.
let handlers = unsafe { HANDLERS.as_ref() };
let handlers = handlers.expect("should be initialized at startup");
handlers.lock()
}
pub fn register_metrics_handler(handler: FfiMetricsHandler) -> Result<(), BoxError> {
let identifier = &handler.identifier;
......@@ -50,7 +30,8 @@ pub fn register_metrics_handler(handler: FfiMetricsHandler) -> Result<(), BoxErr
return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "RPC route service version cannot be empty"));
}
let mut handlers = handlers();
let node = node::global()?;
let mut handlers = node.plugin_manager.metrics_handlers.lock();
let service_id = identifier.service_id();
let entry = handlers.entry(service_id);
......@@ -76,20 +57,16 @@ pub fn register_metrics_handler(handler: FfiMetricsHandler) -> Result<(), BoxErr
Ok(())
}
pub fn unregister_metrics_handler(service_id: &ServiceId) {
// SAFETY: global variable access: safe in tx thread.
let handler = handlers().remove(service_id);
if handler.is_some() {
tlog!(Info, "unregistered metrics handler for `{service_id}`");
}
}
pub fn get_plugin_metrics() -> String {
let _guard = RegionGuard::new();
let mut res = String::new();
let handlers = handlers();
let Ok(node) = node::global() else {
return "".into();
};
let handlers = node.plugin_manager.metrics_handlers.lock();
// Note: must first copy all references to a local vec, because the callbacks may yield.
let handler_copies: Vec<_> = handlers.values().cloned().collect();
// Release the lock.
......
......@@ -1331,7 +1331,7 @@ impl NodeImpl {
if let Err(e) = self
.plugin_manager
.handle_event_async(PluginAsyncEvent::PluginDisabled { name: ident.name })
.add_async_event_to_queue(PluginAsyncEvent::PluginDisabled { name: ident.name })
{
tlog!(Warning, "async plugin event: {e}");
}
......@@ -1431,7 +1431,7 @@ impl NodeImpl {
let old_cfg_raw =
rmp_serde::encode::to_vec_named(&old_cfg).expect("out of memory");
if let Err(e) = self.plugin_manager.handle_event_async(
if let Err(e) = self.plugin_manager.add_async_event_to_queue(
PluginAsyncEvent::ServiceConfigurationUpdated {
ident: ident.clone(),
service: svc.name,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment