From 8147ac3ba15cd04a0db331593992c2670172e7e1 Mon Sep 17 00:00:00 2001
From: godzie44 <godzie@yandex.ru>
Date: Fri, 26 Jul 2024 12:16:19 +0300
Subject: [PATCH] feature(plugin): add metrics module for plugin sdk

Closes 798
---
 picoplugin/src/lib.rs              |  1 +
 picoplugin/src/metrics.rs          | 83 ++++++++++++++++++++++++++++++
 picoplugin/src/plugin/interface.rs | 20 +++++++
 src/lib.rs                         | 16 +++++-
 src/plugin/manager.rs              | 25 +++++++--
 test/int/test_plugin.py            | 18 +++++++
 test/testplug/src/lib.rs           | 10 ++++
 7 files changed, 167 insertions(+), 6 deletions(-)
 create mode 100644 picoplugin/src/metrics.rs

diff --git a/picoplugin/src/lib.rs b/picoplugin/src/lib.rs
index 29f1ff0b3c..34d25cc19e 100644
--- a/picoplugin/src/lib.rs
+++ b/picoplugin/src/lib.rs
@@ -3,6 +3,7 @@ pub mod error_code;
 pub mod internal;
 pub mod interplay;
 pub mod log;
+pub mod metrics;
 pub mod plugin;
 pub mod sql;
 pub mod system;
diff --git a/picoplugin/src/metrics.rs b/picoplugin/src/metrics.rs
new file mode 100644
index 0000000000..58499a7c28
--- /dev/null
+++ b/picoplugin/src/metrics.rs
@@ -0,0 +1,83 @@
+use crate::background::ServiceId;
+use abi_stable::std_types::RString;
+use std::collections::HashMap;
+use std::sync::OnceLock;
+use tarantool::fiber;
+
+pub type MetricsCallback = fn() -> String;
+
+// FIXME: this trampoline is not totally stable, cause Fn() layout may be changed across rust versions.
+// We persist this closure at plugin side and read at Picodata side, so we need a stable closure for this,
+// but there is not safe closures in `stable_abi` crate.
+type StableMetricsCallback = dyn Fn() -> RString + 'static;
+
+/// This component is using by `picodata` for manage all user defined metrics.
+///
+/// *For internal usage, don't use it in your code*.
+#[derive(Default)]
+pub struct InternalGlobalMetricsCollection {
+    metrics: fiber::Mutex<HashMap<ServiceId, Box<StableMetricsCallback>>>,
+}
+
+// SAFETY: `InternalGlobalMetricsCollection` must be used only in the tx thread
+unsafe impl Send for InternalGlobalMetricsCollection {}
+unsafe impl Sync for InternalGlobalMetricsCollection {}
+
+static MGM: OnceLock<InternalGlobalMetricsCollection> = OnceLock::new();
+
+impl InternalGlobalMetricsCollection {
+    /// Return reference to a global metrics collection.
+    pub fn instance() -> &'static Self {
+        let mgm_ref = MGM.get_or_init(InternalGlobalMetricsCollection::default);
+        mgm_ref
+    }
+
+    /// Remove metrics by service identifier.
+    pub fn remove(&self, service_id: &ServiceId) {
+        self.metrics.lock().remove(service_id);
+    }
+
+    /// Return joined metrics from all registered callbacks.
+    pub fn all_metrics(&self) -> String {
+        let mut result = String::new();
+        for cb in self.metrics.lock().values() {
+            let metrics = cb();
+            result += "\n";
+            result += metrics.as_str();
+        }
+        result
+    }
+}
+
+pub struct MetricsCollection<'a> {
+    plugin_name: &'a str,
+    plugin_version: &'a str,
+    service_name: &'a str,
+    global_collection: &'static InternalGlobalMetricsCollection,
+}
+
+impl<'a> MetricsCollection<'a> {
+    pub(crate) fn new(
+        plugin_name: &'a str,
+        plugin_version: &'a str,
+        service_name: &'a str,
+        global_collection: &'static InternalGlobalMetricsCollection,
+    ) -> Self {
+        Self {
+            plugin_name,
+            plugin_version,
+            service_name,
+            global_collection,
+        }
+    }
+
+    /// Append callback with stringified metrics representation to a global metrics collection.
+    /// This callback will be called at every metrics poll request (by request to a "/metrics" http endpoint).
+    pub fn append(&self, callback: MetricsCallback) {
+        let mut lock = self.global_collection.metrics.lock();
+        lock.insert(
+            ServiceId::new(self.plugin_name, self.service_name, self.plugin_version),
+            Box::new(move || callback().into()),
+        );
+    }
+}
diff --git a/picoplugin/src/plugin/interface.rs b/picoplugin/src/plugin/interface.rs
index f33589b027..e463b3bebd 100644
--- a/picoplugin/src/plugin/interface.rs
+++ b/picoplugin/src/plugin/interface.rs
@@ -6,6 +6,7 @@ use std::error::Error;
 use std::fmt::Display;
 
 use crate::background::{InternalGlobalWorkerManager, ServiceId, ServiceWorkerManager};
+use crate::metrics::{InternalGlobalMetricsCollection, MetricsCollection};
 pub use abi_stable;
 use abi_stable::pmr::{RErr, RResult, RSlice};
 use serde::de::DeserializeOwned;
@@ -17,6 +18,7 @@ use tarantool::error::{BoxError, IntoBoxError};
 pub struct PicoContext {
     is_master: bool,
     global_wm: *const (),
+    global_mc: *const (),
     pub plugin_name: FfiSafeStr,
     pub service_name: FfiSafeStr,
     pub plugin_version: FfiSafeStr,
@@ -27,10 +29,13 @@ impl PicoContext {
     pub fn new(is_master: bool) -> PicoContext {
         let gwm = InternalGlobalWorkerManager::instance() as *const InternalGlobalWorkerManager
             as *const ();
+        let mgc = InternalGlobalMetricsCollection::instance()
+            as *const InternalGlobalMetricsCollection as *const ();
 
         Self {
             is_master,
             global_wm: gwm,
+            global_mc: mgc,
             plugin_name: "<unset>".into(),
             service_name: "<unset>".into(),
             plugin_version: "<unset>".into(),
@@ -44,6 +49,7 @@ impl PicoContext {
         Self {
             is_master: self.is_master,
             global_wm: self.global_wm,
+            global_mc: self.global_mc,
             plugin_name: self.plugin_name.clone(),
             service_name: self.service_name.clone(),
             plugin_version: self.plugin_version.clone(),
@@ -71,6 +77,20 @@ impl PicoContext {
         global_manager.get_or_init_manager(service_id)
     }
 
+    /// Return [`MetricsCollection`] for current service.
+    pub fn metrics_collection(&self) -> MetricsCollection {
+        let global_collection: &'static InternalGlobalMetricsCollection =
+            // SAFETY: `picodata` guaranty that this reference live enough
+            unsafe { &*(self.global_mc as *const InternalGlobalMetricsCollection) };
+
+        MetricsCollection::new(
+            self.plugin_name(),
+            self.plugin_version(),
+            self.service_name(),
+            global_collection,
+        )
+    }
+
     #[inline]
     pub fn plugin_name(&self) -> &str {
         // SAFETY: safe because lifetime is managed by borrow checker
diff --git a/src/lib.rs b/src/lib.rs
index 4d20b3f285..866951559a 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -227,8 +227,20 @@ fn start_http_server(Address { host, port, .. }: &Address) {
         }),
     )
     .expect("failed to add route api/v1/cluster to http server");
-    lua.exec(
-        "pico.httpd:route({method = 'GET', path = 'metrics' }, require('metrics.plugins.prometheus').collect_http)",
+
+    lua.exec_with(
+        r#"
+        local user_metrics = ... 
+        pico.httpd:route({method = 'GET', path = 'metrics' }, function() 
+        local resp = require('metrics.plugins.prometheus').collect_http() 
+        resp.body = resp.body .. user_metrics() 
+        return resp
+        end)"#,
+        tlua::Function::new(|| -> _ {
+            let collection = picoplugin::metrics::InternalGlobalMetricsCollection::instance();
+            let metrics = collection.all_metrics();
+            metrics
+        }),
     )
     .expect("failed to add route metrics to http server");
 }
diff --git a/src/plugin/manager.rs b/src/plugin/manager.rs
index fdac5bc64a..c73b56805e 100644
--- a/src/plugin/manager.rs
+++ b/src/plugin/manager.rs
@@ -15,6 +15,7 @@ use crate::traft::node::Node;
 use crate::{tlog, traft, warn_or_panic};
 use abi_stable::derive_macro_reexports::{RErr, RResult, RSlice};
 use picoplugin::background::{Error, InternalGlobalWorkerManager, ServiceId};
+use picoplugin::metrics::InternalGlobalMetricsCollection;
 use picoplugin::plugin::interface::{PicoContext, ServiceRegistry};
 use picoplugin::util::DisplayErrorLocation;
 use std::collections::HashMap;
@@ -310,8 +311,9 @@ impl PluginManager {
         };
 
         if let Some(plugin_state) = plugin {
-            // stop all background jobs first
+            // stop all background jobs and remove metrics first
             stop_background_jobs(std::iter::once(&plugin_state.services));
+            remove_metrics(std::iter::once(&plugin_state.services));
 
             for service in plugin_state.services.iter() {
                 let mut service = service.lock();
@@ -331,8 +333,9 @@ impl PluginManager {
         let plugins = self.plugins.lock();
         let services_to_stop = plugins.values().map(|state| &state.services);
 
-        // stop all background jobs first
+        // stop all background jobs and remove metrics first
         stop_background_jobs(services_to_stop.clone());
+        remove_metrics(services_to_stop.clone());
 
         for services in services_to_stop {
             for service in services.iter() {
@@ -587,8 +590,9 @@ impl PluginManager {
         let service_to_del = state.services.swap_remove(svc_idx);
         drop(plugins);
 
-        // stop all background jobs first
+        // stop all background jobs and remove metrics first
         stop_background_jobs(std::iter::once(&vec![service_to_del.clone()]));
+        remove_metrics(std::iter::once(&vec![service_to_del.clone()]));
 
         // call `on_stop` callback and drop service
         let mut service = service_to_del.lock();
@@ -651,8 +655,9 @@ impl PluginManager {
         };
 
         if let Some(plugin_state) = maybe_plugin_state {
-            // stop all background jobs first
+            // stop all background jobs and remove metrics first
             stop_background_jobs(std::iter::once(&plugin_state.services));
+            remove_metrics(std::iter::once(&plugin_state.services));
 
             for service in plugin_state.services.iter() {
                 let mut service = service.lock();
@@ -890,3 +895,15 @@ fn stop_background_jobs<'a>(plugins: impl Iterator<Item = &'a PluginServices>) {
         fiber.join();
     }
 }
+
+fn remove_metrics<'a>(plugins: impl Iterator<Item = &'a PluginServices>) {
+    for services in plugins {
+        for service in services.iter() {
+            let lock = service.lock();
+            let svc_id = ServiceId::new(&lock.plugin_name, &lock.name, &lock.version);
+            drop(lock);
+
+            InternalGlobalMetricsCollection::instance().remove(&svc_id)
+        }
+    }
+}
diff --git a/test/int/test_plugin.py b/test/int/test_plugin.py
index b086d38bd0..dbc837d4e5 100644
--- a/test/int/test_plugin.py
+++ b/test/int/test_plugin.py
@@ -13,6 +13,7 @@ from conftest import (
     log_crawler,
 )
 from decimal import Decimal
+import requests  # type: ignore
 from conftest import (
     ErrorCode,
 )
@@ -2110,6 +2111,23 @@ def test_sdk_log(cluster: Cluster):
     assert crawler.matched
 
 
+@pytest.mark.webui
+def test_sdk_metrics(instance: Instance):
+    http_listen = instance.env["PICODATA_HTTP_LISTEN"]
+    install_and_enable_plugin(
+        instance,
+        _PLUGIN_W_SDK,
+        _PLUGIN_W_SDK_SERVICES,
+        migrate=True,
+        default_config={"test_type": "metrics"},
+    )
+
+    response = requests.get(f"http://{http_listen}/metrics")
+    assert response.ok
+    assert "test_metric_1 1" in response.text
+    assert "test_metric_2 2" in response.text
+
+
 def test_sdk_background(cluster: Cluster):
     [i1] = cluster.deploy(instance_count=1)
 
diff --git a/test/testplug/src/lib.rs b/test/testplug/src/lib.rs
index 118ad6d8ba..59d92ba164 100644
--- a/test/testplug/src/lib.rs
+++ b/test/testplug/src/lib.rs
@@ -18,6 +18,7 @@ use picoplugin::{internal, log, system};
 use serde::{Deserialize, Serialize};
 use std::cell::Cell;
 use std::fmt::Display;
+use std::process::exit;
 use std::rc::Rc;
 use std::str::FromStr;
 use std::sync;
@@ -393,6 +394,15 @@ impl Service for Service3 {
                 wm.register_job(my_job).unwrap();
             }
             "no_test" => {}
+            "metrics" => {
+                let collection = ctx.metrics_collection();
+                collection.append(|| {
+                    String::from(
+                        r#"test_metric_1 1
+test_metric_2 2"#,
+                    )
+                });
+            }
             _ => {
                 panic!("invalid test type")
             }
-- 
GitLab