Skip to content
Snippets Groups Projects
Commit ab37abfa authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

fix: used to panic when converting f64 to Duration sometimes

parent a237b742
No related branches found
No related tags found
1 merge request!613Fix/reentarable ddl api
...@@ -9,6 +9,7 @@ use crate::schema::{self, CreateSpaceParams}; ...@@ -9,6 +9,7 @@ use crate::schema::{self, CreateSpaceParams};
use crate::traft::error::Error; use crate::traft::error::Error;
use crate::traft::op::{self, Op}; use crate::traft::op::{self, Op};
use crate::traft::{self, node, RaftIndex, RaftTerm}; use crate::traft::{self, node, RaftIndex, RaftTerm};
use crate::util::duration_from_secs_f64_clamped;
use crate::util::str_eq; use crate::util::str_eq;
use crate::{args, rpc, sync, tlog}; use crate::{args, rpc, sync, tlog};
use ::tarantool::fiber; use ::tarantool::fiber;
...@@ -354,7 +355,7 @@ pub(crate) fn setup(args: &args::Run) { ...@@ -354,7 +355,7 @@ pub(crate) fn setup(args: &args::Run) {
(nil, string) in case of an error (nil, string) in case of an error
"}, "},
tlua::function1(|timeout: f64| -> traft::Result<RaftIndex> { tlua::function1(|timeout: f64| -> traft::Result<RaftIndex> {
traft::node::global()?.read_index(Duration::from_secs_f64(timeout)) traft::node::global()?.read_index(duration_from_secs_f64_clamped(timeout))
}), }),
); );
luamod_set( luamod_set(
...@@ -384,7 +385,7 @@ pub(crate) fn setup(args: &args::Run) { ...@@ -384,7 +385,7 @@ pub(crate) fn setup(args: &args::Run) {
tlua::function2( tlua::function2(
|target: RaftIndex, timeout: f64| -> traft::Result<RaftIndex> { |target: RaftIndex, timeout: f64| -> traft::Result<RaftIndex> {
let node = traft::node::global()?; let node = traft::node::global()?;
node.wait_index(target, Duration::from_secs_f64(timeout)) node.wait_index(target, duration_from_secs_f64_clamped(timeout))
}, },
), ),
); );
...@@ -561,7 +562,7 @@ pub(crate) fn setup(args: &args::Run) { ...@@ -561,7 +562,7 @@ pub(crate) fn setup(args: &args::Run) {
"}, "},
tlua::function2( tlua::function2(
|target: Vclock, timeout: f64| -> Result<Vclock, sync::TimeoutError> { |target: Vclock, timeout: f64| -> Result<Vclock, sync::TimeoutError> {
sync::wait_vclock(target, Duration::from_secs_f64(timeout)) sync::wait_vclock(target, duration_from_secs_f64_clamped(timeout))
}, },
), ),
); );
...@@ -622,53 +623,11 @@ pub(crate) fn setup(args: &args::Run) { ...@@ -622,53 +623,11 @@ pub(crate) fn setup(args: &args::Run) {
}), }),
); );
luamod_set(
&l,
"_prepare_schema_change",
indoc! {"
pico._prepare_schema_change(op, timeout)
============================
Internal API, see src/luamod.rs for the details.
Params:
1. op (table)
2. timeout (number) seconds
Returns:
(number) raft index
or
(nil, error) in case of an error
"},
tlua::Function::new(|lua: tlua::StaticLua| -> traft::Result<RaftIndex> {
use tlua::{AnyLuaString, AsLua, LuaError, LuaTable};
let t: LuaTable<_> = (&lua).read_at(1).map_err(|(_, e)| LuaError::from(e))?;
// We do [lua value -> msgpack -> rust -> msgpack]
// instead of [lua value -> rust -> msgpack]
// because despite what it may seem this is much simpler.
// (The final [-> msgpack] is when we eventually do the rpc).
// The transmition medium is always msgpack.
let mp: AnyLuaString = lua
.eval_with("return require 'msgpack'.encode(...)", &t)
.map_err(LuaError::from)?;
let op: Op = Decode::decode(mp.as_bytes())?;
let timeout: f64 = (&lua).read_at(2).map_err(|(_, e)| LuaError::from(e))?;
let timeout = Duration::from_secs_f64(timeout);
let index = schema::prepare_schema_change(op, timeout)?;
Ok(index)
}),
);
luamod_set( luamod_set(
&l, &l,
"_schema_change_cas_request", "_schema_change_cas_request",
indoc! {" indoc! {"
pico._schema_change_cas_request(op, index) pico._schema_change_cas_request(op, index, timeout)
============================ ============================
Internal API, see src/luamod.rs for the details. Internal API, see src/luamod.rs for the details.
...@@ -719,7 +678,7 @@ pub(crate) fn setup(args: &args::Run) { ...@@ -719,7 +678,7 @@ pub(crate) fn setup(args: &args::Run) {
let index: RaftIndex = (&lua).read_at(2).map_err(|(_, e)| LuaError::from(e))?; let index: RaftIndex = (&lua).read_at(2).map_err(|(_, e)| LuaError::from(e))?;
let timeout: f64 = (&lua).read_at(3).map_err(|(_, e)| LuaError::from(e))?; let timeout: f64 = (&lua).read_at(3).map_err(|(_, e)| LuaError::from(e))?;
let timeout = Duration::from_secs_f64(timeout); let timeout = duration_from_secs_f64_clamped(timeout);
let node = node::global()?; let node = node::global()?;
let term = raft::Storage::term(&node.raft_storage, index)?; let term = raft::Storage::term(&node.raft_storage, index)?;
...@@ -1300,7 +1259,7 @@ pub(crate) fn setup(args: &args::Run) { ...@@ -1300,7 +1259,7 @@ pub(crate) fn setup(args: &args::Run) {
"}, "},
{ {
tlua::function1(|timeout: f64| -> traft::Result<RaftIndex> { tlua::function1(|timeout: f64| -> traft::Result<RaftIndex> {
schema::abort_ddl(Duration::from_secs_f64(timeout)) schema::abort_ddl(duration_from_secs_f64_clamped(timeout))
}) })
}, },
); );
...@@ -1340,7 +1299,7 @@ pub(crate) fn setup(args: &args::Run) { ...@@ -1340,7 +1299,7 @@ pub(crate) fn setup(args: &args::Run) {
timeout = t; timeout = t;
} }
} }
let timeout = Duration::from_secs_f64(timeout); let timeout = duration_from_secs_f64_clamped(timeout);
let commit_index = schema::wait_for_ddl_commit(index, timeout)?; let commit_index = schema::wait_for_ddl_commit(index, timeout)?;
Ok(commit_index) Ok(commit_index)
}, },
......
...@@ -9,6 +9,7 @@ use std::time::Duration; ...@@ -9,6 +9,7 @@ use std::time::Duration;
use crate::traft::network::IdOfInstance; use crate::traft::network::IdOfInstance;
use crate::traft::{ConnectionPool, RaftIndex}; use crate::traft::{ConnectionPool, RaftIndex};
use crate::util::duration_from_secs_f64_clamped;
use crate::{rpc, traft}; use crate::{rpc, traft};
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
...@@ -61,7 +62,7 @@ impl rpc::Request for WaitVclockRpc { ...@@ -61,7 +62,7 @@ impl rpc::Request for WaitVclockRpc {
/// See [`wait_vclock`] /// See [`wait_vclock`]
#[proc] #[proc]
fn proc_wait_vclock(target: Vclock, timeout: f64) -> Result<(Vclock,), TimeoutError> { fn proc_wait_vclock(target: Vclock, timeout: f64) -> Result<(Vclock,), TimeoutError> {
wait_vclock(target, Duration::from_secs_f64(timeout)).map(|vclock| (vclock,)) wait_vclock(target, duration_from_secs_f64_clamped(timeout)).map(|vclock| (vclock,))
} }
/// Block current fiber until Tarantool [`Vclock`] reaches the `target`. /// Block current fiber until Tarantool [`Vclock`] reaches the `target`.
...@@ -135,7 +136,7 @@ impl rpc::Request for ReadIndexRpc { ...@@ -135,7 +136,7 @@ impl rpc::Request for ReadIndexRpc {
#[proc] #[proc]
fn proc_read_index(timeout: f64) -> traft::Result<(RaftIndex,)> { fn proc_read_index(timeout: f64) -> traft::Result<(RaftIndex,)> {
let node = traft::node::global()?; let node = traft::node::global()?;
node.read_index(Duration::from_secs_f64(timeout)) node.read_index(duration_from_secs_f64_clamped(timeout))
.map(|index| (index,)) .map(|index| (index,))
} }
...@@ -162,7 +163,7 @@ impl rpc::Request for WaitIndexRpc { ...@@ -162,7 +163,7 @@ impl rpc::Request for WaitIndexRpc {
#[proc] #[proc]
fn proc_wait_index(target: RaftIndex, timeout: f64) -> traft::Result<(RaftIndex,)> { fn proc_wait_index(target: RaftIndex, timeout: f64) -> traft::Result<(RaftIndex,)> {
let node = traft::node::global()?; let node = traft::node::global()?;
node.wait_index(target, Duration::from_secs_f64(timeout)) node.wait_index(target, duration_from_secs_f64_clamped(timeout))
.map(|index| (index,)) .map(|index| (index,))
} }
......
use nix::sys::termios::{tcgetattr, tcsetattr, LocalFlags, SetArg::TCSADRAIN}; use nix::sys::termios::{tcgetattr, tcsetattr, LocalFlags, SetArg::TCSADRAIN};
use crate::traft::error::Error;
use std::any::{Any, TypeId}; use std::any::{Any, TypeId};
use std::io::BufRead as _; use std::io::BufRead as _;
use std::io::BufReader; use std::io::BufReader;
use std::io::Write as _; use std::io::Write as _;
use std::os::unix::io::AsRawFd as _; use std::os::unix::io::AsRawFd as _;
use std::time::Duration; use std::time::Duration;
use crate::traft::error::Error;
pub use Either::{Left, Right}; pub use Either::{Left, Right};
pub const INFINITY: Duration = Duration::from_secs(30 * 365 * 24 * 60 * 60); pub const INFINITY: Duration = Duration::from_secs(30 * 365 * 24 * 60 * 60);
/// Converts `secs` to `Duration`. If `secs` is negative, it's clamped to zero.
#[inline(always)]
pub fn duration_from_secs_f64_clamped(secs: f64) -> Duration {
if secs > 0.0 {
Duration::from_secs_f64(secs)
} else {
Duration::ZERO
}
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// A generic enum that contains exactly one of two possible types. Equivalent /// A generic enum that contains exactly one of two possible types. Equivalent
/// to `std::result::Result`, but is more intuitive in some cases. /// to `std::result::Result`, but is more intuitive in some cases.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment