From cad0e1b41ca2b52390cc2e2ed024c474bb872876 Mon Sep 17 00:00:00 2001
From: Feodor Alexandrov <feodor.alex.alexandrov@gmail.com>
Date: Fri, 10 Nov 2023 18:13:02 +0300
Subject: [PATCH] wip: rpc, sql, picodata_types and on_grade_changed callbacks

---
 Cargo.lock                     |  12 +++
 Cargo.toml                     |   2 +
 picodata-types/Cargo.toml      |  16 ++++
 picodata-types/src/lib.rs      | 131 +++++++++++++++++++++++++++++++++
 sdk/picodata-sdk/Cargo.toml    |   7 ++
 sdk/picodata-sdk/src/lib.rs    |  36 ++++-----
 sdk/plugin-example/Cargo.toml  |   1 -
 sdk/plugin-example/src/lib.rs  |  72 +++++++++++++++++-
 sdk/plugin-example2/src/lib.rs |   4 +-
 src/cas.rs                     |   2 +-
 src/discovery.rs               |   2 +-
 src/lib.rs                     |  10 +--
 src/on_shutdown.rs             |   7 +-
 src/plugin.rs                  |  95 +++++++++++++++++++++---
 src/rpc/ddl_apply.rs           |   2 +-
 src/rpc/expel.rs               |   4 +-
 src/rpc/join.rs                |   2 +-
 src/rpc/mod.rs                 |  86 +---------------------
 src/rpc/replication.rs         |   4 +-
 src/rpc/sharding.rs            |   4 +-
 src/rpc/snapshot.rs            |   2 +-
 src/rpc/update_instance.rs     |   7 +-
 src/sync.rs                    |  10 +--
 src/tarantool.rs               |  22 ------
 src/traft/network.rs           |   4 +-
 src/traft/node.rs              |   2 +-
 test/conftest.py               |   6 ++
 test/int/test_basics.py        |  28 +++++++
 28 files changed, 410 insertions(+), 170 deletions(-)
 create mode 100644 picodata-types/Cargo.toml
 create mode 100644 picodata-types/src/lib.rs

diff --git a/Cargo.lock b/Cargo.lock
index 9164c00847..e72a47990a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1361,6 +1361,7 @@ dependencies = [
  "nix",
  "once_cell",
  "picodata-sdk",
+ "picodata-types",
  "pretty_assertions 0.6.1",
  "protobuf",
  "raft",
@@ -1387,6 +1388,17 @@ name = "picodata-sdk"
 version = "0.1.0"
 dependencies = [
  "libloading 0.8.1",
+ "picodata-types",
+ "tarantool",
+]
+
+[[package]]
+name = "picodata-types"
+version = "0.1.0"
+dependencies = [
+ "libc",
+ "serde",
+ "tarantool",
 ]
 
 [[package]]
diff --git a/Cargo.toml b/Cargo.toml
index b5c8769cae..3f57122132 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -35,6 +35,7 @@ linkme = "0.3.14"
 libloading = "0.8"
 abi_stable = "0.11.2"
 
+picodata-types = {path = "./picodata-types"}
 picodata-sdk = {path = "./sdk/picodata-sdk"}
 
 [dependencies.protobuf]
@@ -83,6 +84,7 @@ harness = false
 [workspace]
 members = [
         ".",
+        "picodata-types",
         "sdk/picodata-sdk", # picodata / plugins etc. it is temporary naming
         "sdk/plugin-example",
         "sdk/plugin-example2",
diff --git a/picodata-types/Cargo.toml b/picodata-types/Cargo.toml
new file mode 100644
index 0000000000..d0fd7ac906
--- /dev/null
+++ b/picodata-types/Cargo.toml
@@ -0,0 +1,16 @@
+[package]
+name = "picodata-types"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+serde = "1.0"
+libc = "0.2.108"
+
+[dependencies.tarantool]
+# path = "./tarantool/tarantool"
+git = "https://git.picodata.io/picodata/picodata/tarantool-module.git"
+version = "4.0"
+features = ["picodata", "test"]
diff --git a/picodata-types/src/lib.rs b/picodata-types/src/lib.rs
new file mode 100644
index 0000000000..81db247890
--- /dev/null
+++ b/picodata-types/src/lib.rs
@@ -0,0 +1,131 @@
+use ::tarantool::network::AsClient as _;
+use ::tarantool::network::Client;
+use ::tarantool::tuple::{DecodeOwned, Encode};
+
+use std::fmt::Debug;
+use std::io;
+
+use serde::de::DeserializeOwned;
+// pub extern crate libc;
+// pub extern crate serde;
+// pub extern crate tarantool;
+
+/// Types implementing this trait represent an RPC's (remote procedure call)
+/// arguments. This trait contains information about the request.
+pub trait RequestArgs: Encode + DecodeOwned {
+    /// Remote procedure name.
+    const PROC_NAME: &'static str;
+
+    /// Describes data returned from a successful RPC request.
+    type Response: Encode + DeserializeOwned + Debug + 'static;
+}
+
+/// Invoke remote procedure call on an instance specified by `address`.
+pub async fn network_call<R>(address: &str, request: &R) -> ::tarantool::Result<R::Response>
+where
+    R: RequestArgs,
+{
+    // TODO: move address parsing into client
+    let (address, port) = address.rsplit_once(':').ok_or_else(|| {
+        ::tarantool::error::Error::IO(io::Error::new(
+            io::ErrorKind::InvalidInput,
+            format!("invalid address: {}", address),
+        ))
+    })?;
+    let port: u16 = port.parse().map_err(|err| {
+        ::tarantool::error::Error::IO(io::Error::new(io::ErrorKind::InvalidInput, err))
+    })?;
+    let client = Client::connect(address, port).await?;
+    let tuple = client.call(R::PROC_NAME, request).await?;
+    tuple.decode().map(|((res,),)| res)
+}
+
+#[macro_export]
+macro_rules! stringify_last_token {
+    ($tail:tt) => { std::stringify!($tail) };
+    ($head:tt $($tail:tt)+) => { $crate::stringify_last_token!($($tail)+) };
+}
+
+/// Checks that the given function exists and returns it's name suitable for
+/// calling it via tarantool rpc.
+///
+/// The argument can be a full path to the function.
+#[macro_export]
+macro_rules! stringify_cfunc {
+    ( $($func_name:tt)+ ) => {{
+        use ::tarantool::tuple::FunctionArgs;
+        use ::tarantool::tuple::FunctionCtx;
+        use libc::c_int;
+
+        const _: unsafe extern "C" fn(FunctionCtx, FunctionArgs) -> c_int = $($func_name)+;
+        concat!(".", $crate::stringify_last_token!($($func_name)+))
+    }};
+}
+
+#[macro_export]
+macro_rules! define_rpc_request {
+    (
+        $(#[$proc_meta:meta])*
+        fn $proc:ident($_r:ident: $_request:ty) -> $result:ty {
+            $($proc_body:tt)*
+        }
+
+        $(#[$req_meta:meta])*
+        pub $req_record:tt $request:ident
+        $({ $($req_named_fields:tt)* })?
+        $(( $($req_unnamed_fields:tt)* );)?
+
+        $(#[$res_meta:meta])*
+        pub $res_record:tt  $response:ident
+        $({ $($res_named_fields:tt)* })?
+        $(( $($res_unnamed_fields:tt)* );)?
+    ) => {
+        $(#[$proc_meta])*
+        #[tarantool::proc(packed_args)]
+        fn $proc($_r: $_request) -> $result {
+            $($proc_body)*
+        }
+
+        impl tarantool::tuple::Encode for $request {}
+        #[derive(Clone, Debug, ::serde::Serialize, ::serde::Deserialize)]
+        $(#[$req_meta])*
+        pub $req_record $request
+        $({ $($req_named_fields)* })?
+        $(( $($req_unnamed_fields)* );)?
+
+        impl tarantool::tuple::Encode for $response {}
+        #[derive(Clone, Debug, ::serde::Serialize, serde::Deserialize)]
+        $(#[$res_meta])*
+        pub $res_record $response
+        $({ $($res_named_fields)* })?
+        $(( $($res_unnamed_fields)* );)?
+
+        impl $crate::RequestArgs for $request {
+            const PROC_NAME: &'static str = $crate::stringify_cfunc!($proc);
+            type Response = $response;
+        }
+    }
+}
+
+define_rpc_request! {
+    fn proc_ping(_req: Request) -> Result<Response, String> {
+        Ok(Response {response: "pong".to_string(),})
+    }
+
+    pub struct Request {
+        pub some_field: String,
+    }
+
+    pub struct Response {
+        pub response: String,
+    }
+
+}
+
+impl Request {
+    pub fn new(ping_message: String) -> Self {
+        Self {
+            some_field: ping_message,
+        }
+    }
+}
diff --git a/sdk/picodata-sdk/Cargo.toml b/sdk/picodata-sdk/Cargo.toml
index b5f91e2381..0f7cbb3ff2 100644
--- a/sdk/picodata-sdk/Cargo.toml
+++ b/sdk/picodata-sdk/Cargo.toml
@@ -7,3 +7,10 @@ edition = "2021"
 
 [dependencies]
 libloading = "0.8"
+picodata-types = { path = "../../picodata-types" }
+
+[dependencies.tarantool]
+# path = "./tarantool/tarantool"
+git = "https://git.picodata.io/picodata/picodata/tarantool-module.git"
+version = "4.0"
+features = ["picodata", "test"]
diff --git a/sdk/picodata-sdk/src/lib.rs b/sdk/picodata-sdk/src/lib.rs
index 8943a6b5c4..47f860ea5a 100644
--- a/sdk/picodata-sdk/src/lib.rs
+++ b/sdk/picodata-sdk/src/lib.rs
@@ -1,6 +1,9 @@
+pub extern crate picodata_types as types;
+pub extern crate tarantool;
+
 type NameCb = fn() -> &'static str;
-type StratCb = fn();
-type StopCb = fn();
+type OnlineCb = fn();
+type ShutdownCb = fn();
 
 #[derive(Debug)]
 pub struct PluginContext {
@@ -26,8 +29,8 @@ pub struct Plugin {
     lib: Option<libloading::Library>,
 
     name: NameCb,
-    start: StratCb,
-    stop: StopCb,
+    online: OnlineCb,
+    shutdown: ShutdownCb,
 }
 
 impl Plugin {
@@ -38,8 +41,8 @@ impl Plugin {
             lib: None,
             // all default callbacks must be defined here
             name: || "not implemented",
-            start: || {},
-            stop: || {},
+            online: || {},
+            shutdown: || {},
         }
     }
 
@@ -49,12 +52,12 @@ impl Plugin {
                 self.name = *symbol;
             }
 
-            if let Ok(symbol) = unsafe { plugin_library.get("start".as_bytes()) } {
-                self.start = *symbol;
+            if let Ok(symbol) = unsafe { plugin_library.get("online".as_bytes()) } {
+                self.online = *symbol;
             }
 
-            if let Ok(symbol) = unsafe { plugin_library.get("stop".as_bytes()) } {
-                self.stop = *symbol;
+            if let Ok(symbol) = unsafe { plugin_library.get("shutdown".as_bytes()) } {
+                self.shutdown = *symbol;
             }
 
             self.lib = Some(plugin_library);
@@ -69,12 +72,12 @@ impl Plugin {
         (self.name)()
     }
 
-    pub fn start(&self) {
-        (self.start)()
+    pub fn online(&self) {
+        (self.online)()
     }
 
-    pub fn stop(&self) {
-        (self.stop)()
+    pub fn shutdown(&self) {
+        (self.shutdown)()
     }
 }
 
@@ -88,8 +91,8 @@ pub trait PluginVtable {
     /***************OnAny***************/
     /// Return unique plugin name
     fn name() -> &'static str;
-    fn start() {}
-    fn stop() {}
+    fn online() {}
+    fn shutdown() {}
 }
 
 #[cfg(test)]
@@ -120,7 +123,6 @@ mod tests {
         let plugin_2 = plugin_2.unwrap();
 
         assert!(std::matches!(plugin_1.name(), "plugin 1"));
-
         assert!(std::matches!(plugin_2.name(), "plugin 2"));
     }
 }
diff --git a/sdk/plugin-example/Cargo.toml b/sdk/plugin-example/Cargo.toml
index 523a217f42..6c4636b1be 100644
--- a/sdk/plugin-example/Cargo.toml
+++ b/sdk/plugin-example/Cargo.toml
@@ -7,7 +7,6 @@ edition = "2021"
 
 [dependencies]
 picodata-sdk = {path = "../picodata-sdk"}
-# serde_json = "1.0.104"
 
 [lib]
 crate-type = ["lib", "cdylib"]
diff --git a/sdk/plugin-example/src/lib.rs b/sdk/plugin-example/src/lib.rs
index 430056f7e9..8c6b718af7 100644
--- a/sdk/plugin-example/src/lib.rs
+++ b/sdk/plugin-example/src/lib.rs
@@ -1,7 +1,33 @@
+use picodata_sdk::tarantool;
+use picodata_sdk::types;
 use picodata_sdk::PluginVtable;
 
 struct Plugin1 {}
 
+/// It must be here? but can't because of macro use
+// define_rpc_request! {
+//     fn proc_ping(_req: Request) -> Result<Response, String> {
+//         Ok(Response {response: "pong".to_string(),})
+//     }
+
+//     pub struct Request {
+//         pub some_field: String,
+//     }
+
+//     pub struct Response {
+//         pub response: String,
+//     }
+
+// }
+
+// impl Request {
+//     pub fn new(ping_message: String) -> Self {
+//         Self {
+//             some_field: ping_message,
+//         }
+//     }
+// }
+
 impl PluginVtable for Plugin1 {
     #[no_mangle]
     fn name() -> &'static str {
@@ -9,12 +35,52 @@ impl PluginVtable for Plugin1 {
     }
 
     #[no_mangle]
-    fn start() {
-        println!("PLUGIN 1 START CALLBACK");
+    fn online() {
+        println!("PLUGIN 1 ONLINE CALLBACK");
+        let lua = tarantool::lua_state();
+
+        // still need to test it somehow
+        match lua.exec(
+            "pico.sql([[
+            create table 'characters' (
+            'id' integer,
+            'name' text not null,
+            'year' integer,
+            primary key ('id')
+        )
+        using memtx distributed by ('id')
+        option (timeout = 3.0) ;
+        ]], {})",
+        ) {
+            Ok(_) => println!("PLUGIN 1 pico.sql succeed"),
+            Err(err) => println!("PLUGIN 1 pico.sql failed: {:?}", err),
+        }
+
+        match lua.exec("pico.sql([[insert into \"characters\" (\"id\", \"name\", \"year\") values (10, 'Duke Caboom', 2019) ;]])") {
+            Ok(_) => println!("PLUGIN 1 pico.sql succeed"),
+            Err(err) => println!("PLUGIN 1 pico.sql failed: {:?}", err),
+        }
+
+        // must fail
+        match lua.exec("pico.sql([[insert into \"assets\" values (1, 'Woody', 2561)]], {})") {
+            Ok(_) => println!("PLUGIN 1 pico.sql succeed"),
+            Err(err) => println!("PLUGIN 1 pico.sql failed: {:?}", err),
+        }
+
+        match lua.exec("pico.sql([[select * from \"characters\"]], {})") {
+            Ok(_) => println!("PLUGIN 1 pico.sql succeed"),
+            Err(err) => println!("PLUGIN 1 pico.sql failed: {:?}", err),
+        }
     }
 
     #[no_mangle]
-    fn stop() {
+    fn shutdown() {
         println!("PLUGIN 1 STOP CALLBACK");
+        let req = types::Request::new("PLUGIN 1 PING MESSAGE".to_string());
+        let rpc = types::network_call("127.0.0.1:3302", &req);
+        match tarantool::fiber::block_on(rpc) {
+            Ok(fut_res) => println!("PLUGIN 1 rpc sent successfully: resp {:?}", fut_res),
+            Err(err) => println!("PLUGIN 1 rpc sending failed {err}"),
+        }
     }
 }
diff --git a/sdk/plugin-example2/src/lib.rs b/sdk/plugin-example2/src/lib.rs
index 29cc7cf7c9..a626bfc895 100644
--- a/sdk/plugin-example2/src/lib.rs
+++ b/sdk/plugin-example2/src/lib.rs
@@ -9,12 +9,12 @@ impl PluginVtable for Plugin2 {
     }
 
     #[no_mangle]
-    fn start() {
+    fn online() {
         println!("PLUGIN 2 START CALLBACK");
     }
 
     #[no_mangle]
-    fn stop() {
+    fn shutdown() {
         println!("PLUGIN 2 STOP CALLBACK");
     }
 }
diff --git a/src/cas.rs b/src/cas.rs
index 5292bfa1fc..6b24e21207 100644
--- a/src/cas.rs
+++ b/src/cas.rs
@@ -293,7 +293,7 @@ fn proc_cas_local(req: Request) -> Result<Response> {
     Ok(Response { index, term })
 }
 
-crate::define_rpc_request! {
+picodata_types::define_rpc_request! {
     /// Performs a clusterwide compare and swap operation.
     /// Should be called only on the raft leader.
     ///
diff --git a/src/discovery.rs b/src/discovery.rs
index 8c8b850edd..4d7d1565e8 100644
--- a/src/discovery.rs
+++ b/src/discovery.rs
@@ -6,10 +6,10 @@ use std::collections::BTreeSet;
 use std::error::Error as StdError;
 use std::time::{Duration, Instant};
 
-use crate::stringify_cfunc;
 use crate::tarantool;
 use crate::traft;
 use crate::util::Either::{self, Left, Right};
+use picodata_types::stringify_cfunc;
 
 type Address = String;
 
diff --git a/src/lib.rs b/src/lib.rs
index d406bf606e..66ed06ea63 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -772,7 +772,7 @@ fn start_join(args: &args::Run, instance_address: String) {
 fn postjoin(args: &args::Run, storage: Clusterwide, raft_storage: RaftSpaceAccess) {
     tlog!(Info, ">>>>> postjoin()");
 
-    PluginList::global_init(&args.plugins);
+    PluginManager::init(&args.plugins);
 
     if let Some(addr) = &args.http_listen {
         start_http_server(addr);
@@ -848,9 +848,7 @@ fn postjoin(args: &args::Run, storage: Clusterwide, raft_storage: RaftSpaceAcces
         }
     }
 
-    if let Err(e) =
-        tarantool::on_shutdown(move || fiber::block_on(on_shutdown::callback(PluginList::get())))
-    {
+    if let Err(e) = tarantool::on_shutdown(move || fiber::block_on(on_shutdown::callback())) {
         tlog!(Error, "failed setting on_shutdown trigger: {e}");
     }
 
@@ -953,8 +951,4 @@ fn postjoin(args: &args::Run, storage: Clusterwide, raft_storage: RaftSpaceAcces
     }
 
     node.sentinel_loop.on_self_activate();
-
-    PluginList::get().iter().for_each(|plugin| {
-        plugin.start();
-    });
 }
diff --git a/src/on_shutdown.rs b/src/on_shutdown.rs
index 241b0b0454..15072a211f 100644
--- a/src/on_shutdown.rs
+++ b/src/on_shutdown.rs
@@ -3,19 +3,18 @@ use std::time::Duration;
 use ::tarantool::fiber;
 
 use crate::has_grades;
+use crate::plugin::PluginManager;
 use crate::tlog;
 use crate::traft::event;
 use crate::traft::node;
 use crate::unwrap_ok_or;
 
-use picodata_sdk::*;
-
-pub async fn callback(plugin_list: &'static [Plugin]) {
+pub async fn callback() {
     let node = node::global().unwrap();
 
     // 1. Wake up the sentinel so it starts trying to set target grade Offline.
     node.sentinel_loop.on_shut_down();
-    plugin_list.iter().for_each(|plugin| plugin.stop());
+    PluginManager::deinit();
     fiber::reschedule();
 
     // 2. Meanwhile, wait until either it succeeds or there is no quorum.
diff --git a/src/plugin.rs b/src/plugin.rs
index 29a7c0e374..baa338554f 100644
--- a/src/plugin.rs
+++ b/src/plugin.rs
@@ -1,28 +1,103 @@
 use picodata_sdk::*;
+use tarantool::fiber;
+use tarantool::fiber::r#async::watch;
 
+use crate::instance::GradeVariant;
+use crate::loop_start;
+use crate::r#loop::FlowControl;
 use crate::tlog;
 
-static mut PLUGIN_LIST: Vec<Plugin> = Vec::new();
+static mut PLUGIN_MANAGER: Option<PluginManager> = None;
 
-pub struct PluginList {}
+pub struct PluginManager {
+    _loop: Option<fiber::JoinHandle<'static, ()>>,
+    loop_waker: watch::Sender<GradeVariant>,
 
-impl PluginList {
-    pub fn global_init(plugins: &[String]) {
+    plugin_list: Vec<Plugin>,
+}
+
+struct PluginManagerState {
+    loop_waker: watch::Receiver<GradeVariant>,
+}
+
+impl PluginManager {
+    pub fn init(plugins: &[String]) {
+        assert!(unsafe { &PLUGIN_MANAGER }.is_none());
+
+        let (loop_waker_tx, loop_waker_rx) = watch::channel(GradeVariant::default());
+        let state = PluginManagerState {
+            loop_waker: loop_waker_rx,
+        };
+        let plugin_manager = PluginManager {
+            _loop: loop_start!("plugin_manager_loop", Self::iter_fn, state),
+            loop_waker: loop_waker_tx,
+            plugin_list: Self::plug_list_init(plugins),
+        };
+
+        unsafe { PLUGIN_MANAGER = Some(plugin_manager) };
+    }
+
+    async fn iter_fn(state: &mut PluginManagerState) -> FlowControl {
+        if state.loop_waker.changed().await.is_ok() {
+            println!("Invoke the plugin callback for GradeChanged event here");
+
+            let variant = state.loop_waker.borrow();
+            match *variant {
+                GradeVariant::Offline => {
+                    println!("Invoke the plugin callback for {} here", *variant)
+                }
+                GradeVariant::Replicated => {
+                    println!("Invoke the plugin callback for {} here", *variant)
+                }
+                GradeVariant::ShardingInitialized => {
+                    println!("Invoke the plugin callback for {} here", *variant)
+                }
+                GradeVariant::Online => {
+                    println!("Invoke the plugin callback for {} here", *variant);
+                    Self::online();
+                }
+                GradeVariant::Expelled => {
+                    println!("Invoke the plugin callback for {} here", *variant)
+                }
+            }
+        }
+
+        FlowControl::Continue
+    }
+
+    fn plug_list_init(plugins: &[String]) -> Vec<Plugin> {
         let mut plugin_list: Vec<Plugin> = Vec::new();
         plugins.iter().for_each(|path| {
             if let Some(plugin) = Plugin::new().load(path) {
                 plugin_list.push(plugin);
             }
         });
-
-        unsafe { PLUGIN_LIST = plugin_list };
-
-        unsafe { &PLUGIN_LIST }.iter().for_each(|plugin| {
+        plugin_list.iter().for_each(|plugin| {
             tlog!(Info, "Plugin: '{}' loaded", plugin.name());
         });
+
+        plugin_list
+    }
+
+    pub fn notify_grade_changed(variant: GradeVariant) {
+        if let Some(plugin_manager_loop) = unsafe { &PLUGIN_MANAGER } {
+            let _ = plugin_manager_loop.loop_waker.send(variant);
+        }
+    }
+
+    pub fn online() {
+        if let Some(plugin_manager_loop) = unsafe { &PLUGIN_MANAGER } {
+            plugin_manager_loop.plugin_list.iter().for_each(|plugin| {
+                plugin.online();
+            });
+        }
     }
 
-    pub fn get() -> &'static Vec<Plugin> {
-        unsafe { &PLUGIN_LIST }
+    pub fn deinit() {
+        if let Some(plugin_manager_loop) = unsafe { &PLUGIN_MANAGER } {
+            plugin_manager_loop.plugin_list.iter().for_each(|plugin| {
+                plugin.shutdown();
+            });
+        }
     }
 }
diff --git a/src/rpc/ddl_apply.rs b/src/rpc/ddl_apply.rs
index 629525a8a9..169844f5f0 100644
--- a/src/rpc/ddl_apply.rs
+++ b/src/rpc/ddl_apply.rs
@@ -10,7 +10,7 @@ use std::time::Duration;
 use tarantool::error::TarantoolErrorCode;
 use tarantool::transaction::{transaction, TransactionError};
 
-crate::define_rpc_request! {
+picodata_types::define_rpc_request! {
     /// Forces the target instance to actually apply the pending schema change locally.
     ///
     /// Should be called by a governor on every replicaset master in the cluster
diff --git a/src/rpc/expel.rs b/src/rpc/expel.rs
index b1ed4881a4..8c92868721 100644
--- a/src/rpc/expel.rs
+++ b/src/rpc/expel.rs
@@ -9,7 +9,7 @@ use crate::traft::{error::Error, node};
 
 const TIMEOUT: Duration = Duration::from_secs(10);
 
-crate::define_rpc_request! {
+picodata_types::define_rpc_request! {
     /// Submits a request to expel the specified instance. If successful
     /// the instance's target grade - expelled - will be replicated
     /// on all of the cluster instances through Raft.
@@ -59,7 +59,7 @@ pub mod redirect {
     use crate::rpc::network_call_to_leader;
     use crate::traft::Result;
 
-    crate::define_rpc_request! {
+    picodata_types::define_rpc_request! {
         fn proc_expel_redirect(req: Request) -> Result<Response> {
             let Request(req_to_leader) = req;
             fiber::block_on(network_call_to_leader(&req_to_leader))?;
diff --git a/src/rpc/join.rs b/src/rpc/join.rs
index f1b3a42ce0..49cb09b2ce 100644
--- a/src/rpc/join.rs
+++ b/src/rpc/join.rs
@@ -19,7 +19,7 @@ use ::tarantool::fiber;
 
 const TIMEOUT: Duration = Duration::from_secs(10);
 
-crate::define_rpc_request! {
+picodata_types::define_rpc_request! {
     /// Submits a request to join a new instance to the cluster. If successful, the information about
     /// the new instance and its address will be replicated on all of the cluster instances
     /// through Raft.
diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs
index c5944d39e9..b12e270fdc 100644
--- a/src/rpc/mod.rs
+++ b/src/rpc/mod.rs
@@ -1,18 +1,11 @@
 //! Remote procedure calls
-
-use ::tarantool::network::AsClient as _;
-use ::tarantool::network::Client;
-use ::tarantool::tuple::{DecodeOwned, Encode};
+pub use picodata_types::network_call;
+pub use picodata_types::RequestArgs;
 
 use crate::traft::error::Error;
 use crate::traft::node;
 use crate::traft::Result;
 
-use std::fmt::Debug;
-use std::io;
-
-use serde::de::DeserializeOwned;
-
 pub mod ddl_apply;
 pub mod expel;
 pub mod join;
@@ -21,36 +14,6 @@ pub mod sharding;
 pub mod snapshot;
 pub mod update_instance;
 
-/// Types implementing this trait represent an RPC's (remote procedure call)
-/// arguments. This trait contains information about the request.
-pub trait RequestArgs: Encode + DecodeOwned {
-    /// Remote procedure name.
-    const PROC_NAME: &'static str;
-
-    /// Describes data returned from a successful RPC request.
-    type Response: Encode + DeserializeOwned + Debug + 'static;
-}
-
-/// Invoke remote procedure call on an instance specified by `address`.
-pub async fn network_call<R>(address: &str, request: &R) -> ::tarantool::Result<R::Response>
-where
-    R: RequestArgs,
-{
-    // TODO: move address parsing into client
-    let (address, port) = address.rsplit_once(':').ok_or_else(|| {
-        ::tarantool::error::Error::IO(io::Error::new(
-            io::ErrorKind::InvalidInput,
-            format!("invalid address: {}", address),
-        ))
-    })?;
-    let port: u16 = port.parse().map_err(|err| {
-        ::tarantool::error::Error::IO(io::Error::new(io::ErrorKind::InvalidInput, err))
-    })?;
-    let client = Client::connect(address, port).await?;
-    let tuple = client.call(R::PROC_NAME, request).await?;
-    tuple.decode().map(|((res,),)| res)
-}
-
 /// Invoke remote procedure call on a Raft leader.
 pub async fn network_call_to_leader<R>(request: &R) -> Result<R::Response>
 where
@@ -62,48 +25,3 @@ where
     let resp = network_call(&leader_address, request).await?;
     Ok(resp)
 }
-
-#[macro_export]
-macro_rules! define_rpc_request {
-    (
-        $(#[$proc_meta:meta])*
-        fn $proc:ident($_r:ident: $_request:ty) -> $result:ty {
-            $($proc_body:tt)*
-        }
-
-        $(#[$req_meta:meta])*
-        pub $req_record:tt $request:ident
-        $({ $($req_named_fields:tt)* })?
-        $(( $($req_unnamed_fields:tt)* );)?
-
-        $(#[$res_meta:meta])*
-        pub $res_record:tt  $response:ident
-        $({ $($res_named_fields:tt)* })?
-        $(( $($res_unnamed_fields:tt)* );)?
-    ) => {
-        $(#[$proc_meta])*
-        #[::tarantool::proc(packed_args)]
-        fn $proc($_r: $_request) -> $result {
-            $($proc_body)*
-        }
-
-        impl ::tarantool::tuple::Encode for $request {}
-        #[derive(Clone, Debug, ::serde::Serialize, ::serde::Deserialize)]
-        $(#[$req_meta])*
-        pub $req_record $request
-        $({ $($req_named_fields)* })?
-        $(( $($req_unnamed_fields)* );)?
-
-        impl ::tarantool::tuple::Encode for $response {}
-        #[derive(Clone, Debug, ::serde::Serialize, ::serde::Deserialize)]
-        $(#[$res_meta])*
-        pub $res_record $response
-        $({ $($res_named_fields)* })?
-        $(( $($res_unnamed_fields)* );)?
-
-        impl $crate::rpc::RequestArgs for $request {
-            const PROC_NAME: &'static str = $crate::stringify_cfunc!($proc);
-            type Response = $response;
-        }
-    }
-}
diff --git a/src/rpc/replication.rs b/src/rpc/replication.rs
index 24af9d12b3..44f08ca94b 100644
--- a/src/rpc/replication.rs
+++ b/src/rpc/replication.rs
@@ -1,7 +1,7 @@
 use crate::tarantool::set_cfg_field;
 use crate::traft::Result;
 
-crate::define_rpc_request! {
+picodata_types::define_rpc_request! {
     /// Configures replication on the target replica.
     /// Specifies addresses of all the replicas in the replicaset
     /// and whether the target instance should be a replicaset master.
@@ -53,7 +53,7 @@ crate::define_rpc_request! {
 pub mod promote {
     use crate::traft::Result;
 
-    crate::define_rpc_request! {
+    picodata_types::define_rpc_request! {
         /// Promotes the target instance from read-only replica to master.
         /// See [tarantool documentation](https://www.tarantool.io/en/doc/latest/reference/configuration/#cfg-basic-read-only)
         /// for more.
diff --git a/src/rpc/sharding.rs b/src/rpc/sharding.rs
index 8926e9fc5c..93c340434f 100644
--- a/src/rpc/sharding.rs
+++ b/src/rpc/sharding.rs
@@ -6,7 +6,7 @@ use crate::traft::{node, RaftIndex, RaftTerm};
 
 use std::time::Duration;
 
-crate::define_rpc_request! {
+picodata_types::define_rpc_request! {
     /// (Re)configures sharding. Sets up the vshard storage and vshard router
     /// components on the target instance. The configuration for them is taken
     /// from local storage.
@@ -70,7 +70,7 @@ crate::define_rpc_request! {
 pub mod bootstrap {
     use super::*;
 
-    crate::define_rpc_request! {
+    picodata_types::define_rpc_request! {
         /// Calls `vshard.router.bootstrap()` on the target instance.
         /// See [tarantool documentaion](https://www.tarantool.io/en/doc/latest/reference/reference_rock/vshard/vshard_router/#lua-function.vshard.router.bootstrap)
         /// for more information on vshard router bootstrap process.
diff --git a/src/rpc/snapshot.rs b/src/rpc/snapshot.rs
index 780c349f7f..87fa792b03 100644
--- a/src/rpc/snapshot.rs
+++ b/src/rpc/snapshot.rs
@@ -4,7 +4,7 @@ use crate::storage::SnapshotPosition;
 use crate::traft::RaftEntryId;
 use crate::traft::Result;
 
-crate::define_rpc_request! {
+picodata_types::define_rpc_request! {
     fn proc_raft_snapshot_next_chunk(req: Request) -> Result<Response> {
         let storage = Clusterwide::try_get(false)?;
         let snapshot_data = storage.next_snapshot_data_chunk(
diff --git a/src/rpc/update_instance.rs b/src/rpc/update_instance.rs
index 8c3052dcf7..6e3b46d74b 100644
--- a/src/rpc/update_instance.rs
+++ b/src/rpc/update_instance.rs
@@ -16,7 +16,7 @@ use ::tarantool::fiber;
 
 const TIMEOUT: Duration = Duration::from_secs(10);
 
-crate::define_rpc_request! {
+picodata_types::define_rpc_request! {
     /// Submits a request to update the specified instance. If successful
     /// the updated information about the instance will be replicated
     /// on all of the cluster instances through Raft.
@@ -167,6 +167,11 @@ pub fn handle_update_instance_request_and_wait(req: Request, timeout: Duration)
         }
         node.main_loop.wakeup();
         drop(guard);
+
+        if let Some(grade) = req.target_grade {
+            crate::plugin::PluginManager::notify_grade_changed(grade);
+        }
+
         return Ok(());
     }
 }
diff --git a/src/sync.rs b/src/sync.rs
index 9c2cf7753e..b3d60323bb 100644
--- a/src/sync.rs
+++ b/src/sync.rs
@@ -37,7 +37,7 @@ pub async fn call_get_vclock(
     let vclock: Vclock = pool
         .call_raw(
             instance_id,
-            crate::stringify_cfunc!(proc_get_vclock),
+            picodata_types::stringify_cfunc!(proc_get_vclock),
             &(),
             None,
         )?
@@ -58,7 +58,7 @@ pub struct WaitVclockRpc {
 impl Encode for WaitVclockRpc {}
 
 impl rpc::RequestArgs for WaitVclockRpc {
-    const PROC_NAME: &'static str = crate::stringify_cfunc!(proc_wait_vclock);
+    const PROC_NAME: &'static str = picodata_types::stringify_cfunc!(proc_wait_vclock);
     type Response = (Vclock,);
 }
 
@@ -116,7 +116,7 @@ pub async fn call_get_index(
     let (index,): (RaftIndex,) = pool
         .call_raw(
             instance_id,
-            crate::stringify_cfunc!(proc_get_index),
+            picodata_types::stringify_cfunc!(proc_get_index),
             &(),
             None,
         )?
@@ -136,7 +136,7 @@ pub struct ReadIndexRpc {
 impl Encode for ReadIndexRpc {}
 
 impl rpc::RequestArgs for ReadIndexRpc {
-    const PROC_NAME: &'static str = crate::stringify_cfunc!(proc_read_index);
+    const PROC_NAME: &'static str = picodata_types::stringify_cfunc!(proc_read_index);
     type Response = (RaftIndex,);
 }
 
@@ -163,7 +163,7 @@ pub struct WaitIndexRpc {
 impl Encode for WaitIndexRpc {}
 
 impl rpc::RequestArgs for WaitIndexRpc {
-    const PROC_NAME: &'static str = crate::stringify_cfunc!(proc_wait_index);
+    const PROC_NAME: &'static str = picodata_types::stringify_cfunc!(proc_wait_index);
     type Response = (RaftIndex,);
 }
 
diff --git a/src/tarantool.rs b/src/tarantool.rs
index 0993c92f54..11aa76e5cd 100644
--- a/src/tarantool.rs
+++ b/src/tarantool.rs
@@ -9,28 +9,6 @@ use ::tarantool::tlua::{self, LuaError, LuaFunction, LuaRead, LuaTable, LuaThrea
 pub use ::tarantool::trigger::on_shutdown;
 use ::tarantool::tuple::ToTupleBuffer;
 
-#[macro_export]
-macro_rules! stringify_last_token {
-    ($tail:tt) => { std::stringify!($tail) };
-    ($head:tt $($tail:tt)+) => { $crate::stringify_last_token!($($tail)+) };
-}
-
-/// Checks that the given function exists and returns it's name suitable for
-/// calling it via tarantool rpc.
-///
-/// The argument can be a full path to the function.
-#[macro_export]
-macro_rules! stringify_cfunc {
-    ( $($func_name:tt)+ ) => {{
-        use ::tarantool::tuple::FunctionArgs;
-        use ::tarantool::tuple::FunctionCtx;
-        use libc::c_int;
-
-        const _: unsafe extern "C" fn(FunctionCtx, FunctionArgs) -> c_int = $($func_name)+;
-        concat!(".", $crate::stringify_last_token!($($func_name)+))
-    }};
-}
-
 mod ffi {
     use libc::c_char;
     use libc::c_int;
diff --git a/src/traft/network.rs b/src/traft/network.rs
index cadc9908dd..bd5f855430 100644
--- a/src/traft/network.rs
+++ b/src/traft/network.rs
@@ -45,7 +45,9 @@ pub struct WorkerOptions {
 impl Default for WorkerOptions {
     fn default() -> Self {
         Self {
-            raft_msg_handler: crate::stringify_cfunc!(crate::traft::node::proc_raft_interact),
+            raft_msg_handler: picodata_types::stringify_cfunc!(
+                crate::traft::node::proc_raft_interact
+            ),
             call_timeout: DEFAULT_CALL_TIMEOUT,
             max_concurrent_futs: DEFAULT_CUNCURRENT_FUTURES,
         }
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 45b255e340..ed9e5660f7 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -21,7 +21,6 @@ use crate::storage::SnapshotData;
 use crate::storage::{ddl_abort_on_master, ddl_meta_space_update_operable};
 use crate::storage::{local_schema_version, set_local_schema_version};
 use crate::storage::{Clusterwide, ClusterwideSpace, PropertyName};
-use crate::stringify_cfunc;
 use crate::sync;
 use crate::tlog;
 use crate::traft;
@@ -43,6 +42,7 @@ use crate::unwrap_ok_or;
 use crate::unwrap_some_or;
 use crate::util::AnyWithTypeName;
 use crate::warn_or_panic;
+use picodata_types::stringify_cfunc;
 
 use ::raft::prelude as raft;
 use ::raft::Error as RaftError;
diff --git a/test/conftest.py b/test/conftest.py
index f9714235df..3fd4c071e4 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -405,6 +405,7 @@ class Instance:
     color: Callable[[str], str]
 
     tier: str | None = None
+    plugins: str | None = None
     init_replication_factor: int | None = None
     init_cfg_path: str | None = None
     instance_id: str | None = None
@@ -447,6 +448,7 @@ class Instance:
             *(["--init-cfg", self.init_cfg_path]
               if self.init_cfg_path is not None else []),
             *(["--tier", self.tier] if self.tier is not None else []),
+            *(["--plugins", self.plugins] if self.plugins is not None else []),
         ]
         # fmt: on
 
@@ -993,6 +995,7 @@ class Cluster:
         instance_count: int,
         init_replication_factor: int | None = None,
         tier: str | None = None,
+        plugins: str | None = None,
     ) -> list[Instance]:
         assert not self.instances, "Already deployed"
 
@@ -1000,6 +1003,7 @@ class Cluster:
             self.add_instance(
                 wait_online=False,
                 tier=tier,
+                plugins=plugins,
                 init_replication_factor=init_replication_factor,
             )
 
@@ -1028,6 +1032,7 @@ class Cluster:
         failure_domain=dict(),
         init_replication_factor: int | None = None,
         tier: str | None = None,
+        plugins: str | None = None,
     ) -> Instance:
         """Add an `Instance` into the list of instances of the cluster and wait
         for it to attain Online grade unless `wait_online` is `False`.
@@ -1075,6 +1080,7 @@ class Cluster:
             failure_domain=failure_domain,
             init_replication_factor=init_replication_factor,
             tier=tier,
+            plugins=plugins,
             init_cfg_path=self.cfg_path,
         )
 
diff --git a/test/int/test_basics.py b/test/int/test_basics.py
index 68ac690366..91165bd571 100644
--- a/test/int/test_basics.py
+++ b/test/int/test_basics.py
@@ -294,3 +294,31 @@ def test_governor_notices_restarts(instance: Instance):
     check_vshard_configured(instance)
 
     assert instance.current_grade() == dict(variant="Online", incarnation=2)
+
+
+def test_plugins(cluster: Cluster):
+    i1, *_ = cluster.deploy(
+        instance_count=1, plugins="target/debug/libplugin_example.so"
+    )
+    print(i1.call("pico.instance_info", "i1"))
+
+    # plugin1 create table on 'online' cb, check if cb really called strictly on grade online,
+    # via checking if table exist:
+    i1.wait_online()
+
+    # From Rust plugin1 call the following:
+    #  lua.exec(
+    #         "pico.sql([[
+    #         create table \"characters\" (
+    #         \"id\" integer,
+    #         \"name\" text not null,
+    #         \"year\" integer,
+    #         primary key (\"id\")
+    #     )
+    #     using memtx distributed by (\"id\")
+    #     option (timeout = 3.0) ;
+    #     ]], {})",);
+
+    # But the next line provoke the following error: message='sbroad: space CHARACTERS not found'
+    # data = i1.sql("""select * from characters""")
+    # # assert data["rows"] == [[], [], []]
-- 
GitLab