From 37e7fa28195eae0ce3f9d07d9c031b64244daae6 Mon Sep 17 00:00:00 2001
From: Egor Ivkov <e.ivkov@picodata.io>
Date: Wed, 29 Mar 2023 13:55:24 +0000
Subject: [PATCH] refactor: integrate new client into rpc requests

---
 src/lib.rs             | 29 ++++++++++++-----------------
 src/main.rs            |  2 +-
 src/on_shutdown.rs     |  2 +-
 src/traft/rpc/expel.rs |  8 ++++----
 src/traft/rpc/mod.rs   | 37 +++++++++++++++++++++++--------------
 tarantool              |  2 +-
 6 files changed, 42 insertions(+), 38 deletions(-)

diff --git a/src/lib.rs b/src/lib.rs
index fef857ef82..db334a518c 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -3,6 +3,8 @@ use serde::{Deserialize, Serialize};
 use ::raft::prelude as raft;
 use ::tarantool::error::Error as TntError;
 use ::tarantool::fiber;
+use ::tarantool::fiber::r#async::timeout;
+use ::tarantool::fiber::r#async::timeout::IntoTimeout as _;
 use ::tarantool::tlua;
 use ::tarantool::transaction::start_transaction;
 use std::convert::TryFrom;
@@ -152,13 +154,10 @@ fn picolib_setup(args: &args::Run) {
             let cluster_id = raft_storage
                 .cluster_id()?
                 .expect("cluster_id is set on boot");
-            rpc::net_box_call_to_leader(
-                &rpc::expel::Request {
-                    instance_id,
-                    cluster_id,
-                },
-                Duration::MAX,
-            )?;
+            fiber::block_on(rpc::network_call_to_leader(&rpc::expel::Request {
+                instance_id,
+                cluster_id,
+            }))?;
             Ok(())
         }),
     );
@@ -787,7 +786,7 @@ fn start_join(args: &args::Run, leader_address: String) {
         let now = Instant::now();
         // TODO: exponential decay
         let timeout = Duration::from_secs(1);
-        match rpc::net_box_call(&leader_address, &req, Duration::MAX) {
+        match fiber::block_on(rpc::network_call(&leader_address, &req)) {
             Ok(join::Response::Ok(resp)) => {
                 break resp;
             }
@@ -800,7 +799,7 @@ fn start_join(args: &args::Run, leader_address: String) {
                 }
                 continue;
             }
-            Err(TntError::IO(e)) => {
+            Err(TntError::Tcp(e)) => {
                 tlog!(Warning, "join request failed: {e}, retry...");
                 fiber::sleep(timeout.saturating_sub(now.elapsed()));
                 continue;
@@ -928,7 +927,7 @@ fn postjoin(args: &args::Run, storage: Clusterwide, raft_storage: RaftSpaceAcces
         // leader is permitted to propose PersistInstance entries.
         let now = Instant::now();
         let timeout = Duration::from_secs(10);
-        match rpc::net_box_call(&leader_address, &req, timeout) {
+        match fiber::block_on(rpc::network_call(&leader_address, &req).timeout(timeout)) {
             Ok(update_instance::Response::Ok) => {
                 break;
             }
@@ -937,7 +936,7 @@ fn postjoin(args: &args::Run, storage: Clusterwide, raft_storage: RaftSpaceAcces
                 fiber::sleep(Duration::from_millis(100));
                 continue;
             }
-            Err(TntError::IO(e)) => {
+            Err(timeout::Error::Failed(TntError::Tcp(e))) => {
                 tlog!(Warning, "failed to activate myself: {e}, retry...");
                 fiber::sleep(timeout.saturating_sub(now.elapsed()));
                 continue;
@@ -950,16 +949,12 @@ fn postjoin(args: &args::Run, storage: Clusterwide, raft_storage: RaftSpaceAcces
     }
 }
 
-pub fn tt_expel(args: args::Expel) {
+pub async fn tt_expel(args: args::Expel) {
     let req = rpc::expel::Request {
         cluster_id: args.cluster_id,
         instance_id: args.instance_id,
     };
-    let res = rpc::net_box_call(
-        &args.peer_address,
-        &rpc::expel::redirect::Request(req),
-        Duration::MAX,
-    );
+    let res = rpc::network_call(&args.peer_address, &rpc::expel::redirect::Request(req)).await;
     match res {
         Ok(_) => {
             tlog!(Info, "Success expel call");
diff --git a/src/main.rs b/src/main.rs
index 9e27ca3faa..e6332d5303 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -228,7 +228,7 @@ fn main_expel(args: args::Expel) -> ! {
         callback_data: (args,),
         callback_data_type: (args::Expel,),
         callback_body: {
-            picodata::tt_expel(args)
+            ::tarantool::fiber::block_on(picodata::tt_expel(args))
         }
     );
     std::process::exit(rc);
diff --git a/src/on_shutdown.rs b/src/on_shutdown.rs
index a25b8d8467..2ef842f0e5 100644
--- a/src/on_shutdown.rs
+++ b/src/on_shutdown.rs
@@ -112,7 +112,7 @@ fn go_offline() -> traft::Result<()> {
             fiber::sleep(wait_before_retry.saturating_sub(now.elapsed()));
             continue;
         };
-        let res = match rpc::net_box_call(&leader_address, &req, Duration::MAX) {
+        let res = match fiber::block_on(rpc::network_call(&leader_address, &req)) {
             Ok(update_instance::Response::Ok) => Ok(()),
             Ok(update_instance::Response::ErrNotALeader) => Err(Error::NotALeader),
             Err(e) => Err(e.into()),
diff --git a/src/traft/rpc/expel.rs b/src/traft/rpc/expel.rs
index f91fea088d..290544b5af 100644
--- a/src/traft/rpc/expel.rs
+++ b/src/traft/rpc/expel.rs
@@ -44,15 +44,15 @@ crate::define_rpc_request! {
 }
 
 pub mod redirect {
-    use crate::traft::rpc::net_box_call_to_leader;
-    use crate::traft::Result;
+    use ::tarantool::fiber;
 
-    use std::time::Duration;
+    use crate::traft::rpc::network_call_to_leader;
+    use crate::traft::Result;
 
     crate::define_rpc_request! {
         fn proc_expel_redirect(req: Request) -> Result<Response> {
             let Request(req_to_leader) = req;
-            net_box_call_to_leader(&req_to_leader, Duration::MAX)?;
+            fiber::block_on(network_call_to_leader(&req_to_leader))?;
             Ok(Response {})
         }
 
diff --git a/src/traft/rpc/mod.rs b/src/traft/rpc/mod.rs
index 5ef0228650..569d25c1b8 100644
--- a/src/traft/rpc/mod.rs
+++ b/src/traft/rpc/mod.rs
@@ -1,12 +1,13 @@
+use ::tarantool::network::AsClient as _;
+use ::tarantool::network::Client;
 use ::tarantool::tuple::{DecodeOwned, Encode};
 
 use crate::traft::error::Error;
 use crate::traft::node;
 use crate::traft::Result;
 
-use std::fmt::{Debug, Display};
-use std::net::ToSocketAddrs;
-use std::time::Duration;
+use std::fmt::Debug;
+use std::io;
 
 use serde::de::DeserializeOwned;
 
@@ -28,28 +29,36 @@ pub trait Request: Encode + DecodeOwned {
     type Response: Encode + DeserializeOwned + Debug + 'static;
 }
 
-// FIXME: should this go through pool?
-#[inline(always)]
-pub fn net_box_call<R>(
-    address: impl ToSocketAddrs + Display,
-    request: &R,
-    timeout: Duration,
-) -> ::tarantool::Result<R::Response>
+pub async fn network_call<R>(address: &str, request: &R) -> ::tarantool::Result<R::Response>
 where
     R: Request,
 {
-    crate::tarantool::net_box_call(&address, R::PROC_NAME, request, timeout)
+    // TODO: move address parsing into client
+    let (address, port) = address.rsplit_once(':').ok_or_else(|| {
+        ::tarantool::error::Error::IO(io::Error::new(
+            io::ErrorKind::InvalidInput,
+            format!("invalid address: {}", address),
+        ))
+    })?;
+    let port: u16 = port.parse().map_err(|err| {
+        ::tarantool::error::Error::IO(io::Error::new(io::ErrorKind::InvalidInput, err))
+    })?;
+    let client = Client::connect(address, port).await?;
+    let tuple = client
+        .call(R::PROC_NAME, request)
+        .await?
+        .expect("unexpected result Ok(None)");
+    tuple.decode().map(|((res,),)| res)
 }
 
-#[inline]
-pub fn net_box_call_to_leader<R>(request: &R, timeout: Duration) -> Result<R::Response>
+pub async fn network_call_to_leader<R>(request: &R) -> Result<R::Response>
 where
     R: Request,
 {
     let node = node::global()?;
     let leader_id = node.status().leader_id.ok_or(Error::LeaderUnknown)?;
     let leader_address = node.storage.peer_addresses.try_get(leader_id)?;
-    let resp = net_box_call(&leader_address, request, timeout)?;
+    let resp = network_call(&leader_address, request).await?;
     Ok(resp)
 }
 
diff --git a/tarantool b/tarantool
index b4a2aef3ad..b19e78816b 160000
--- a/tarantool
+++ b/tarantool
@@ -1 +1 @@
-Subproject commit b4a2aef3adc269a5afe70ca1f2f799375c3dbe55
+Subproject commit b19e78816b24c587dfb931f22a3a4d81f9cfc0f0
-- 
GitLab