Skip to content
Snippets Groups Projects
Commit 0c7ee55c authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon: Committed by Yaroslav Dynnikov
Browse files

refactor: remove the last usage of tarantool::net_box

parent d930272a
No related branches found
No related tags found
1 merge request!876pico service password file
use ::tarantool::fiber::{mutex::MutexGuard, sleep, Mutex};
use ::tarantool::fiber;
use ::tarantool::fiber::r#async::sleep;
use ::tarantool::fiber::r#async::timeout::IntoTimeout as _;
use ::tarantool::fiber::{mutex::MutexGuard, Mutex};
use ::tarantool::proc;
use ::tarantool::uuid::Uuid;
use serde::{Deserialize, Serialize};
......@@ -7,7 +10,6 @@ use std::error::Error as StdError;
use std::time::{Duration, Instant};
use crate::stringify_cfunc;
use crate::tarantool;
use crate::traft;
use crate::util::Either::{self, Left, Right};
......@@ -167,7 +169,12 @@ pub fn init_global(peers: impl IntoIterator<Item = impl Into<Address>>) {
unsafe { DISCOVERY = Some(Box::new(Mutex::new(d))) }
}
#[inline(always)]
pub fn wait_global() -> Role {
fiber::block_on(wait_global_async())
}
pub async fn wait_global_async() -> Role {
loop {
let d = discovery().expect("discovery uninitialized");
let (request, curr_peers) = match d.next_or_role() {
......@@ -177,18 +184,27 @@ pub fn wait_global() -> Role {
drop(d); // release the lock before doing i/o
let round_start = Instant::now();
for address in curr_peers {
if let Some(response) = tarantool::net_box_call_or_log(
let res = crate::rpc::network_call_raw(
&address,
stringify_cfunc!(proc_discover),
(&request, &address),
Duration::from_secs(2),
) {
discovery()
&(&request, &address),
)
.timeout(Duration::from_secs(2))
.await;
match res {
Ok(response) => discovery()
.expect("discovery deinitialized")
.handle_response(address, response)
.handle_response(address, response),
Err(e) => {
crate::tlog!(
Warning,
"calling .proc_discover failed to '{address}' failed: {e}"
);
}
}
}
sleep(Duration::from_millis(200).saturating_sub(round_start.elapsed()))
let time_left_to_sleep = Duration::from_millis(200).saturating_sub(round_start.elapsed());
sleep(time_left_to_sleep).await;
}
}
......
......@@ -43,10 +43,26 @@ where
Ok(res)
}
/// Invoke remote procedure call on an instance specified by `address`.
/// Create a one-time iproto connection and send a remote procedure call `request`
/// to the instance specified by `address`.
#[inline(always)]
pub async fn network_call<R>(address: &str, request: &R) -> ::tarantool::Result<R::Response>
where
R: RequestArgs,
{
network_call_raw(address, R::PROC_NAME, request).await
}
/// Create a one-time iproto connection and send a request to execute stored
/// procedure `proc` with provided `args` on the instance specified by `address`.
pub async fn network_call_raw<A, R>(
address: &str,
proc: &'static str,
args: &A,
) -> ::tarantool::Result<R>
where
A: tarantool::tuple::ToTupleBuffer,
R: serde::de::DeserializeOwned + 'static,
{
// TODO: move address parsing into client
let (address, port) = address.rsplit_once(':').ok_or_else(|| {
......@@ -66,11 +82,12 @@ where
));
let client = Client::connect_with_config(address, port, config).await?;
let tuple = client.call(R::PROC_NAME, request).await?;
let tuple = client.call(proc, args).await?;
decode_iproto_return_value(tuple)
}
/// Invoke remote procedure call on a Raft leader.
/// Create a one-time iproto connection and send a remote procedure call `request`
/// to the current raft leader.
pub async fn network_call_to_leader<R>(request: &R) -> Result<R::Response>
where
R: RequestArgs,
......
use file_shred::*;
use std::ffi::CStr;
use std::os::unix::ffi::OsStrExt;
use std::time::Duration;
use std::time::Instant;
use crate::pico_service::pico_service_password;
use crate::schema::PICO_SERVICE_USER_NAME;
use ::tarantool::fiber;
use ::tarantool::lua_state;
use ::tarantool::net_box;
use ::tarantool::tlua::{self, LuaError, LuaFunction, LuaRead, LuaTable, LuaThread, PushGuard};
pub use ::tarantool::trigger::on_shutdown;
use ::tarantool::tuple::ToTupleBuffer;
#[macro_export]
macro_rules! stringify_last_token {
......@@ -196,65 +190,6 @@ where
l.eval(code)
}
// fn net_box_repeat_call_until_succeed<Args, Res, Addr>(
pub fn net_box_call<Args, Res, Addr>(
address: Addr,
fn_name: &str,
args: &Args,
timeout: Duration,
) -> Result<Res, ::tarantool::error::Error>
where
Args: ToTupleBuffer,
Addr: std::net::ToSocketAddrs + std::fmt::Display,
Res: serde::de::DeserializeOwned,
{
let now = Instant::now();
let conn_opts = net_box::ConnOptions {
user: PICO_SERVICE_USER_NAME.into(),
password: pico_service_password().into(),
connect_timeout: timeout,
..Default::default()
};
let conn = net_box::Conn::new(&address, conn_opts, None)?;
let call_opts = net_box::Options {
timeout: Some(timeout.saturating_sub(now.elapsed())),
..Default::default()
};
let tuple = conn
.call(fn_name, args, &call_opts)?
.expect("unexpected net_box result Ok(None)");
crate::rpc::decode_iproto_return_value(tuple)
}
#[inline]
pub fn net_box_call_or_log<Args, Res, Addr>(
address: Addr,
fn_name: &str,
args: Args,
timeout: Duration,
) -> Option<Res>
where
Args: ToTupleBuffer,
Addr: std::net::ToSocketAddrs + std::fmt::Display + slog::Value,
Res: serde::de::DeserializeOwned,
{
match net_box_call(&address, fn_name, &args, timeout) {
Ok(res) => Some(res),
Err(e) => {
crate::tlog!(Warning, "net_box_call failed: {e}";
"peer" => &address,
"fn" => fn_name,
);
None
}
}
}
/// Analogue of tarantool's `os.exit(code)`. Use this function if tarantool's
/// [`on_shutdown`] triggers must run. If instead you want to skip on_shutdown
/// triggers, use [`std::process::exit`] instead.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment