diff --git a/src/luamod.rs b/src/luamod.rs index 3df829452ba527524b60bf15544ec7ad0337029e..51cd5f69ef07cbc027a5db3d147757f7d128ec83 100644 --- a/src/luamod.rs +++ b/src/luamod.rs @@ -9,6 +9,7 @@ use crate::schema::{self, CreateSpaceParams}; use crate::traft::error::Error; use crate::traft::op::{self, Op}; use crate::traft::{self, node, RaftIndex, RaftTerm}; +use crate::util::duration_from_secs_f64_clamped; use crate::util::str_eq; use crate::{args, rpc, sync, tlog}; use ::tarantool::fiber; @@ -354,7 +355,7 @@ pub(crate) fn setup(args: &args::Run) { (nil, string) in case of an error "}, tlua::function1(|timeout: f64| -> traft::Result<RaftIndex> { - traft::node::global()?.read_index(Duration::from_secs_f64(timeout)) + traft::node::global()?.read_index(duration_from_secs_f64_clamped(timeout)) }), ); luamod_set( @@ -384,7 +385,7 @@ pub(crate) fn setup(args: &args::Run) { tlua::function2( |target: RaftIndex, timeout: f64| -> traft::Result<RaftIndex> { let node = traft::node::global()?; - node.wait_index(target, Duration::from_secs_f64(timeout)) + node.wait_index(target, duration_from_secs_f64_clamped(timeout)) }, ), ); @@ -561,7 +562,7 @@ pub(crate) fn setup(args: &args::Run) { "}, tlua::function2( |target: Vclock, timeout: f64| -> Result<Vclock, sync::TimeoutError> { - sync::wait_vclock(target, Duration::from_secs_f64(timeout)) + sync::wait_vclock(target, duration_from_secs_f64_clamped(timeout)) }, ), ); @@ -622,53 +623,11 @@ pub(crate) fn setup(args: &args::Run) { }), ); - luamod_set( - &l, - "_prepare_schema_change", - indoc! {" - pico._prepare_schema_change(op, timeout) - ============================ - - Internal API, see src/luamod.rs for the details. - - Params: - - 1. op (table) - 2. timeout (number) seconds - - Returns: - - (number) raft index - or - (nil, error) in case of an error - "}, - tlua::Function::new(|lua: tlua::StaticLua| -> traft::Result<RaftIndex> { - use tlua::{AnyLuaString, AsLua, LuaError, LuaTable}; - - let t: LuaTable<_> = (&lua).read_at(1).map_err(|(_, e)| LuaError::from(e))?; - // We do [lua value -> msgpack -> rust -> msgpack] - // instead of [lua value -> rust -> msgpack] - // because despite what it may seem this is much simpler. - // (The final [-> msgpack] is when we eventually do the rpc). - // The transmition medium is always msgpack. - let mp: AnyLuaString = lua - .eval_with("return require 'msgpack'.encode(...)", &t) - .map_err(LuaError::from)?; - let op: Op = Decode::decode(mp.as_bytes())?; - - let timeout: f64 = (&lua).read_at(2).map_err(|(_, e)| LuaError::from(e))?; - let timeout = Duration::from_secs_f64(timeout); - - let index = schema::prepare_schema_change(op, timeout)?; - Ok(index) - }), - ); - luamod_set( &l, "_schema_change_cas_request", indoc! {" - pico._schema_change_cas_request(op, index) + pico._schema_change_cas_request(op, index, timeout) ============================ Internal API, see src/luamod.rs for the details. @@ -719,7 +678,7 @@ pub(crate) fn setup(args: &args::Run) { let index: RaftIndex = (&lua).read_at(2).map_err(|(_, e)| LuaError::from(e))?; let timeout: f64 = (&lua).read_at(3).map_err(|(_, e)| LuaError::from(e))?; - let timeout = Duration::from_secs_f64(timeout); + let timeout = duration_from_secs_f64_clamped(timeout); let node = node::global()?; let term = raft::Storage::term(&node.raft_storage, index)?; @@ -1300,7 +1259,7 @@ pub(crate) fn setup(args: &args::Run) { "}, { tlua::function1(|timeout: f64| -> traft::Result<RaftIndex> { - schema::abort_ddl(Duration::from_secs_f64(timeout)) + schema::abort_ddl(duration_from_secs_f64_clamped(timeout)) }) }, ); @@ -1340,7 +1299,7 @@ pub(crate) fn setup(args: &args::Run) { timeout = t; } } - let timeout = Duration::from_secs_f64(timeout); + let timeout = duration_from_secs_f64_clamped(timeout); let commit_index = schema::wait_for_ddl_commit(index, timeout)?; Ok(commit_index) }, diff --git a/src/sync.rs b/src/sync.rs index 09c11021c69ade94cbefbc0c0630b442d399088f..6c5d8c08bf1d394f94e781472dd9676b5f9276a2 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -9,6 +9,7 @@ use std::time::Duration; use crate::traft::network::IdOfInstance; use crate::traft::{ConnectionPool, RaftIndex}; +use crate::util::duration_from_secs_f64_clamped; use crate::{rpc, traft}; #[derive(thiserror::Error, Debug)] @@ -61,7 +62,7 @@ impl rpc::Request for WaitVclockRpc { /// See [`wait_vclock`] #[proc] fn proc_wait_vclock(target: Vclock, timeout: f64) -> Result<(Vclock,), TimeoutError> { - wait_vclock(target, Duration::from_secs_f64(timeout)).map(|vclock| (vclock,)) + wait_vclock(target, duration_from_secs_f64_clamped(timeout)).map(|vclock| (vclock,)) } /// Block current fiber until Tarantool [`Vclock`] reaches the `target`. @@ -135,7 +136,7 @@ impl rpc::Request for ReadIndexRpc { #[proc] fn proc_read_index(timeout: f64) -> traft::Result<(RaftIndex,)> { let node = traft::node::global()?; - node.read_index(Duration::from_secs_f64(timeout)) + node.read_index(duration_from_secs_f64_clamped(timeout)) .map(|index| (index,)) } @@ -162,7 +163,7 @@ impl rpc::Request for WaitIndexRpc { #[proc] fn proc_wait_index(target: RaftIndex, timeout: f64) -> traft::Result<(RaftIndex,)> { let node = traft::node::global()?; - node.wait_index(target, Duration::from_secs_f64(timeout)) + node.wait_index(target, duration_from_secs_f64_clamped(timeout)) .map(|index| (index,)) } diff --git a/src/util.rs b/src/util.rs index 9d1c1850db7e97988b39bbb50b623780d622d94a..b4d71f134fdfaa29b287d6bd484053c552cbe355 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,17 +1,26 @@ use nix::sys::termios::{tcgetattr, tcsetattr, LocalFlags, SetArg::TCSADRAIN}; +use crate::traft::error::Error; use std::any::{Any, TypeId}; use std::io::BufRead as _; use std::io::BufReader; use std::io::Write as _; use std::os::unix::io::AsRawFd as _; use std::time::Duration; - -use crate::traft::error::Error; pub use Either::{Left, Right}; pub const INFINITY: Duration = Duration::from_secs(30 * 365 * 24 * 60 * 60); +/// Converts `secs` to `Duration`. If `secs` is negative, it's clamped to zero. +#[inline(always)] +pub fn duration_from_secs_f64_clamped(secs: f64) -> Duration { + if secs > 0.0 { + Duration::from_secs_f64(secs) + } else { + Duration::ZERO + } +} + //////////////////////////////////////////////////////////////////////////////// /// A generic enum that contains exactly one of two possible types. Equivalent /// to `std::result::Result`, but is more intuitive in some cases.