From 2580cb2d9615c50400148ecb6b7e234a9bfe1055 Mon Sep 17 00:00:00 2001
From: Denis Smirnov <sd@picodata.io>
Date: Mon, 12 Aug 2024 17:58:29 +0700
Subject: [PATCH] feat: implement SQL cache fiber

---
 Cargo.lock                                    |  16 +-
 sbroad-core/src/executor/engine/helpers.rs    |   1 +
 .../src/executor/engine/helpers/proxy.rs      | 145 ++++++++++++++++++
 .../src/executor/engine/helpers/storage.rs    |  17 +-
 4 files changed, 164 insertions(+), 15 deletions(-)
 create mode 100644 sbroad-core/src/executor/engine/helpers/proxy.rs

diff --git a/Cargo.lock b/Cargo.lock
index 63193dd16..5cb66e2d1 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1311,8 +1311,8 @@ dependencies = [
 
 [[package]]
 name = "tarantool"
-version = "5.0.0"
-source = "git+https://git.picodata.io/picodata/picodata/tarantool-module.git#d758e775797267a9d12e0e2afef73244a9aeaf5e"
+version = "5.1.0"
+source = "git+https://git.picodata.io/picodata/picodata/tarantool-module.git#e8b43dce60620fd022bab613c1df9c8ab66f8933"
 dependencies = [
  "async-trait",
  "base64",
@@ -1344,8 +1344,8 @@ dependencies = [
 
 [[package]]
 name = "tarantool-proc"
-version = "3.1.0"
-source = "git+https://git.picodata.io/picodata/picodata/tarantool-module.git#d758e775797267a9d12e0e2afef73244a9aeaf5e"
+version = "3.1.1"
+source = "git+https://git.picodata.io/picodata/picodata/tarantool-module.git#e8b43dce60620fd022bab613c1df9c8ab66f8933"
 dependencies = [
  "darling",
  "proc-macro-error",
@@ -1444,8 +1444,8 @@ dependencies = [
 
 [[package]]
 name = "tlua"
-version = "3.1.0"
-source = "git+https://git.picodata.io/picodata/picodata/tarantool-module.git#d758e775797267a9d12e0e2afef73244a9aeaf5e"
+version = "3.2.0"
+source = "git+https://git.picodata.io/picodata/picodata/tarantool-module.git#e8b43dce60620fd022bab613c1df9c8ab66f8933"
 dependencies = [
  "libc",
  "serde",
@@ -1455,8 +1455,8 @@ dependencies = [
 
 [[package]]
 name = "tlua-derive"
-version = "0.2.0"
-source = "git+https://git.picodata.io/picodata/picodata/tarantool-module.git#d758e775797267a9d12e0e2afef73244a9aeaf5e"
+version = "0.2.1"
+source = "git+https://git.picodata.io/picodata/picodata/tarantool-module.git#e8b43dce60620fd022bab613c1df9c8ab66f8933"
 dependencies = [
  "proc-macro2 1.0.85",
  "quote 1.0.36",
diff --git a/sbroad-core/src/executor/engine/helpers.rs b/sbroad-core/src/executor/engine/helpers.rs
index bbbb011ae..3b97e8d14 100644
--- a/sbroad-core/src/executor/engine/helpers.rs
+++ b/sbroad-core/src/executor/engine/helpers.rs
@@ -60,6 +60,7 @@ use tarantool::tuple::Tuple;
 
 use super::{Metadata, Router, Vshard};
 
+pub mod proxy;
 pub mod storage;
 pub mod vshard;
 
diff --git a/sbroad-core/src/executor/engine/helpers/proxy.rs b/sbroad-core/src/executor/engine/helpers/proxy.rs
new file mode 100644
index 000000000..5a47a6391
--- /dev/null
+++ b/sbroad-core/src/executor/engine/helpers/proxy.rs
@@ -0,0 +1,145 @@
+use std::cell::OnceCell;
+use std::rc::Rc;
+use std::time::Duration;
+
+use tarantool::error::Error;
+use tarantool::fiber::channel::Channel;
+use tarantool::fiber::{self, SendError};
+use tarantool::sql::{prepare, unprepare, Statement};
+
+const SEND_TIMEOUT: Duration = Duration::from_millis(1);
+
+#[derive(Debug)]
+pub(crate) enum CacheRequest {
+    Prepare((String, fiber::FiberId)),
+    UnPrepare((Statement, fiber::FiberId)),
+}
+
+impl CacheRequest {
+    fn fiber_id(&self) -> fiber::FiberId {
+        match self {
+            CacheRequest::Prepare((_, id)) => *id,
+            CacheRequest::UnPrepare((_, id)) => *id,
+        }
+    }
+}
+
+#[derive(Debug)]
+pub(crate) enum CacheResponse {
+    Prepared(Result<Statement, Error>),
+    UnPrepared(Result<(), Error>),
+}
+
+pub(crate) struct SqlCacheProxy {
+    pub(crate) id: fiber::FiberId,
+    request: Rc<Channel<CacheRequest>>,
+    response: Rc<Channel<CacheResponse>>,
+}
+
+fn proxy_start(rq: Rc<Channel<CacheRequest>>, rsp: Rc<Channel<CacheResponse>>) -> fiber::FiberId {
+    fiber::Builder::new()
+        .name("sql_cache")
+        .func(move || {
+            'main: loop {
+                match rq.recv() {
+                    Some(req) => {
+                        let client_fiber_id = req.fiber_id();
+                        let mut result = match req {
+                            CacheRequest::Prepare((query, _)) => {
+                                CacheResponse::Prepared(prepare(query))
+                            }
+                            CacheRequest::UnPrepare((stmt, _)) => {
+                                CacheResponse::UnPrepared(unprepare(stmt))
+                            }
+                        };
+                        'send: loop {
+                            match rsp.send_timeout(result, SEND_TIMEOUT) {
+                                Ok(()) => break 'send,
+                                Err(SendError::Timeout(rsp)) => {
+                                    // Client fiber is still alive, so we can try to send
+                                    // the response again.
+                                    if fiber::wakeup(client_fiber_id) {
+                                        result = rsp;
+                                        continue 'send;
+                                    }
+                                    // Client fiber was cancelled, so there is no need
+                                    // to send the response.
+                                    break 'send;
+                                }
+                                Err(SendError::Disconnected(_)) => break 'main,
+                            }
+                        }
+                    }
+                    None => break 'main,
+                }
+            }
+            // The channel is closed or sql_cache fiber is cancelled.
+            if fiber::is_cancelled() {
+                panic!("sql_cache fiber is cancelled");
+            }
+            panic!("sql_cache request channel is closed");
+        })
+        .start_non_joinable()
+        .expect("Failed to start sql_cache fiber")
+}
+
+impl SqlCacheProxy {
+    fn new() -> Self {
+        let rq_inner = Rc::new(Channel::<CacheRequest>::new(0));
+        let rq_outer = Rc::clone(&rq_inner);
+        let rsp_inner = Rc::new(Channel::<CacheResponse>::new(0));
+        let rsp_outer = Rc::clone(&rsp_inner);
+        let id = proxy_start(rq_inner, rsp_inner);
+        SqlCacheProxy {
+            id,
+            request: rq_outer,
+            response: rsp_outer,
+        }
+    }
+
+    fn send(&self, request: CacheRequest) -> Result<(), Error> {
+        fiber::wakeup(sql_cache_proxy().id);
+        if self.request.send(request).is_err() {
+            if fiber::is_cancelled() {
+                return Err(Error::Other("current fiber is cancelled".into()));
+            }
+            panic!("sql_cache request channel is closed");
+        }
+        Ok(())
+    }
+
+    fn receive(&self) -> Result<CacheResponse, Error> {
+        match self.response.recv() {
+            Some(resp) => Ok(resp),
+            None => {
+                if fiber::is_cancelled() {
+                    return Err(Error::Other("current fiber is cancelled".into()));
+                }
+                panic!("sql_cache response channel is closed");
+            }
+        }
+    }
+
+    pub(crate) fn prepare(&self, query: String) -> Result<Statement, Error> {
+        let request = CacheRequest::Prepare((query, fiber::id()));
+        self.send(request)?;
+        match self.receive()? {
+            CacheResponse::Prepared(resp) => resp,
+            CacheResponse::UnPrepared(_) => unreachable!("Unexpected unprepare response"),
+        }
+    }
+
+    pub(crate) fn unprepare(&self, stmt: Statement) -> Result<(), Error> {
+        let request = CacheRequest::UnPrepare((stmt, fiber::id()));
+        self.send(request)?;
+        match self.receive()? {
+            CacheResponse::Prepared(_) => unreachable!("Unexpected prepare response"),
+            CacheResponse::UnPrepared(resp) => resp,
+        }
+    }
+}
+
+pub(crate) fn sql_cache_proxy() -> &'static SqlCacheProxy {
+    static mut PROXY: OnceCell<SqlCacheProxy> = OnceCell::new();
+    unsafe { PROXY.get_or_init(SqlCacheProxy::new) }
+}
diff --git a/sbroad-core/src/executor/engine/helpers/storage.rs b/sbroad-core/src/executor/engine/helpers/storage.rs
index f4f0aa9db..43a05d8c8 100644
--- a/sbroad-core/src/executor/engine/helpers/storage.rs
+++ b/sbroad-core/src/executor/engine/helpers/storage.rs
@@ -6,22 +6,20 @@ use std::collections::HashMap;
 use std::io::Read;
 use tarantool::session::with_su;
 use tarantool::space::Space;
-use tarantool::sql::{
-    prepare as tnt_prepare, prepare_and_execute_raw, unprepare as tnt_unprepare, Statement,
-};
+use tarantool::sql::{prepare_and_execute_raw, Statement};
 use tarantool::tuple::{Tuple, TupleBuffer};
 
 use crate::backend::sql::space::ADMIN_ID;
 use crate::error;
 use crate::errors::SbroadError;
+use crate::executor::engine::helpers::proxy::sql_cache_proxy;
+use crate::executor::engine::helpers::table_name;
 use crate::executor::lru::DEFAULT_CAPACITY;
 use crate::ir::value::{EncodedValue, Value};
 use crate::ir::NodeId;
 use crate::otm::child_span;
 use crate::utils::ByteCounter;
 
-use super::table_name;
-
 const IPROTO_DATA: u8 = 0x30;
 const IPROTO_META: u8 = 0x32;
 
@@ -59,7 +57,8 @@ impl StorageMetadata {
 
 #[otm_child_span("tarantool.statement.prepare")]
 pub fn prepare(pattern: String) -> Result<Statement, SbroadError> {
-    let stmt = tnt_prepare(pattern).map_err(|e| {
+    let proxy = sql_cache_proxy();
+    let stmt = proxy.prepare(pattern).map_err(|e| {
         error!(Option::from("prepare"), &format!("{e:?}"));
         SbroadError::from(e)
     })?;
@@ -74,7 +73,11 @@ pub fn unprepare(
     let (stmt, table_ids) = std::mem::take(entry);
 
     // Remove the statement from the instance cache.
-    tnt_unprepare(stmt);
+    let proxy = sql_cache_proxy();
+    proxy.unprepare(stmt).map_err(|e| {
+        error!(Option::from("unprepare"), &format!("{e:?}"));
+        SbroadError::from(e)
+    })?;
 
     // Remove temporary tables from the instance.
     for node_id in table_ids {
-- 
GitLab