diff --git a/src/lib.rs b/src/lib.rs index fef857ef821c2f9f4cbe31a96be1535e8565a7ea..db334a518c27dec85e3238ff0e30a83f60c825e8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,8 @@ use serde::{Deserialize, Serialize}; use ::raft::prelude as raft; use ::tarantool::error::Error as TntError; use ::tarantool::fiber; +use ::tarantool::fiber::r#async::timeout; +use ::tarantool::fiber::r#async::timeout::IntoTimeout as _; use ::tarantool::tlua; use ::tarantool::transaction::start_transaction; use std::convert::TryFrom; @@ -152,13 +154,10 @@ fn picolib_setup(args: &args::Run) { let cluster_id = raft_storage .cluster_id()? .expect("cluster_id is set on boot"); - rpc::net_box_call_to_leader( - &rpc::expel::Request { - instance_id, - cluster_id, - }, - Duration::MAX, - )?; + fiber::block_on(rpc::network_call_to_leader(&rpc::expel::Request { + instance_id, + cluster_id, + }))?; Ok(()) }), ); @@ -787,7 +786,7 @@ fn start_join(args: &args::Run, leader_address: String) { let now = Instant::now(); // TODO: exponential decay let timeout = Duration::from_secs(1); - match rpc::net_box_call(&leader_address, &req, Duration::MAX) { + match fiber::block_on(rpc::network_call(&leader_address, &req)) { Ok(join::Response::Ok(resp)) => { break resp; } @@ -800,7 +799,7 @@ fn start_join(args: &args::Run, leader_address: String) { } continue; } - Err(TntError::IO(e)) => { + Err(TntError::Tcp(e)) => { tlog!(Warning, "join request failed: {e}, retry..."); fiber::sleep(timeout.saturating_sub(now.elapsed())); continue; @@ -928,7 +927,7 @@ fn postjoin(args: &args::Run, storage: Clusterwide, raft_storage: RaftSpaceAcces // leader is permitted to propose PersistInstance entries. let now = Instant::now(); let timeout = Duration::from_secs(10); - match rpc::net_box_call(&leader_address, &req, timeout) { + match fiber::block_on(rpc::network_call(&leader_address, &req).timeout(timeout)) { Ok(update_instance::Response::Ok) => { break; } @@ -937,7 +936,7 @@ fn postjoin(args: &args::Run, storage: Clusterwide, raft_storage: RaftSpaceAcces fiber::sleep(Duration::from_millis(100)); continue; } - Err(TntError::IO(e)) => { + Err(timeout::Error::Failed(TntError::Tcp(e))) => { tlog!(Warning, "failed to activate myself: {e}, retry..."); fiber::sleep(timeout.saturating_sub(now.elapsed())); continue; @@ -950,16 +949,12 @@ fn postjoin(args: &args::Run, storage: Clusterwide, raft_storage: RaftSpaceAcces } } -pub fn tt_expel(args: args::Expel) { +pub async fn tt_expel(args: args::Expel) { let req = rpc::expel::Request { cluster_id: args.cluster_id, instance_id: args.instance_id, }; - let res = rpc::net_box_call( - &args.peer_address, - &rpc::expel::redirect::Request(req), - Duration::MAX, - ); + let res = rpc::network_call(&args.peer_address, &rpc::expel::redirect::Request(req)).await; match res { Ok(_) => { tlog!(Info, "Success expel call"); diff --git a/src/main.rs b/src/main.rs index 9e27ca3faa9ad71b56c0a5fc2dc23dd8705e7d8e..e6332d5303a62cc2f621d5b36187c60041bc0f1b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -228,7 +228,7 @@ fn main_expel(args: args::Expel) -> ! { callback_data: (args,), callback_data_type: (args::Expel,), callback_body: { - picodata::tt_expel(args) + ::tarantool::fiber::block_on(picodata::tt_expel(args)) } ); std::process::exit(rc); diff --git a/src/on_shutdown.rs b/src/on_shutdown.rs index a25b8d8467962d5f57a0f7d454e5039c81b94d5f..2ef842f0e5b520f0820beae40d72a4bdb0cd92af 100644 --- a/src/on_shutdown.rs +++ b/src/on_shutdown.rs @@ -112,7 +112,7 @@ fn go_offline() -> traft::Result<()> { fiber::sleep(wait_before_retry.saturating_sub(now.elapsed())); continue; }; - let res = match rpc::net_box_call(&leader_address, &req, Duration::MAX) { + let res = match fiber::block_on(rpc::network_call(&leader_address, &req)) { Ok(update_instance::Response::Ok) => Ok(()), Ok(update_instance::Response::ErrNotALeader) => Err(Error::NotALeader), Err(e) => Err(e.into()), diff --git a/src/traft/rpc/expel.rs b/src/traft/rpc/expel.rs index f91fea088d1c1adea18ab1fad53198a32025f70d..290544b5af2f761f8ac42fec2567662d45be6b48 100644 --- a/src/traft/rpc/expel.rs +++ b/src/traft/rpc/expel.rs @@ -44,15 +44,15 @@ crate::define_rpc_request! { } pub mod redirect { - use crate::traft::rpc::net_box_call_to_leader; - use crate::traft::Result; + use ::tarantool::fiber; - use std::time::Duration; + use crate::traft::rpc::network_call_to_leader; + use crate::traft::Result; crate::define_rpc_request! { fn proc_expel_redirect(req: Request) -> Result<Response> { let Request(req_to_leader) = req; - net_box_call_to_leader(&req_to_leader, Duration::MAX)?; + fiber::block_on(network_call_to_leader(&req_to_leader))?; Ok(Response {}) } diff --git a/src/traft/rpc/mod.rs b/src/traft/rpc/mod.rs index 5ef022865006d51cfebf30e66f34b2d894278136..569d25c1b862057736c5e9d47bcb05bbe02282a9 100644 --- a/src/traft/rpc/mod.rs +++ b/src/traft/rpc/mod.rs @@ -1,12 +1,13 @@ +use ::tarantool::network::AsClient as _; +use ::tarantool::network::Client; use ::tarantool::tuple::{DecodeOwned, Encode}; use crate::traft::error::Error; use crate::traft::node; use crate::traft::Result; -use std::fmt::{Debug, Display}; -use std::net::ToSocketAddrs; -use std::time::Duration; +use std::fmt::Debug; +use std::io; use serde::de::DeserializeOwned; @@ -28,28 +29,36 @@ pub trait Request: Encode + DecodeOwned { type Response: Encode + DeserializeOwned + Debug + 'static; } -// FIXME: should this go through pool? -#[inline(always)] -pub fn net_box_call<R>( - address: impl ToSocketAddrs + Display, - request: &R, - timeout: Duration, -) -> ::tarantool::Result<R::Response> +pub async fn network_call<R>(address: &str, request: &R) -> ::tarantool::Result<R::Response> where R: Request, { - crate::tarantool::net_box_call(&address, R::PROC_NAME, request, timeout) + // TODO: move address parsing into client + let (address, port) = address.rsplit_once(':').ok_or_else(|| { + ::tarantool::error::Error::IO(io::Error::new( + io::ErrorKind::InvalidInput, + format!("invalid address: {}", address), + )) + })?; + let port: u16 = port.parse().map_err(|err| { + ::tarantool::error::Error::IO(io::Error::new(io::ErrorKind::InvalidInput, err)) + })?; + let client = Client::connect(address, port).await?; + let tuple = client + .call(R::PROC_NAME, request) + .await? + .expect("unexpected result Ok(None)"); + tuple.decode().map(|((res,),)| res) } -#[inline] -pub fn net_box_call_to_leader<R>(request: &R, timeout: Duration) -> Result<R::Response> +pub async fn network_call_to_leader<R>(request: &R) -> Result<R::Response> where R: Request, { let node = node::global()?; let leader_id = node.status().leader_id.ok_or(Error::LeaderUnknown)?; let leader_address = node.storage.peer_addresses.try_get(leader_id)?; - let resp = net_box_call(&leader_address, request, timeout)?; + let resp = network_call(&leader_address, request).await?; Ok(resp) } diff --git a/tarantool b/tarantool index b4a2aef3adc269a5afe70ca1f2f799375c3dbe55..b19e78816b24c587dfb931f22a3a4d81f9cfc0f0 160000 --- a/tarantool +++ b/tarantool @@ -1 +1 @@ -Subproject commit b4a2aef3adc269a5afe70ca1f2f799375c3dbe55 +Subproject commit b19e78816b24c587dfb931f22a3a4d81f9cfc0f0