diff --git a/src/schema.rs b/src/schema.rs index dfaa165c7998b156ef81aef109bb8be0b68c376a..f63b8c10c61f9afa5c7a0798660fb45da702dd11 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -1,7 +1,8 @@ use std::borrow::Cow; use std::collections::{BTreeMap, HashSet}; -use std::time::{Duration, Instant}; +use std::time::Duration; +use tarantool::fiber; use tarantool::space::{FieldType, SpaceCreateOptions, SpaceEngineType}; use tarantool::space::{Space, SystemSpace}; use tarantool::transaction::{transaction, TransactionError}; @@ -24,7 +25,6 @@ use crate::storage::{Clusterwide, ClusterwideSpaceId, PropertyName}; use crate::traft::error::Error; use crate::traft::op::{Ddl, Op}; use crate::traft::{self, event, node, RaftIndex}; -use crate::util::instant_saturating_add; //////////////////////////////////////////////////////////////////////////////// // SpaceDef @@ -561,7 +561,7 @@ pub fn wait_for_ddl_commit( timeout: Duration, ) -> traft::Result<RaftIndex> { let raft_storage = &node::global()?.raft_storage; - let deadline = instant_saturating_add(Instant::now(), timeout); + let deadline = fiber::clock().saturating_add(timeout); let last_seen = prepare_commit; loop { let cur_applied = raft_storage.applied()?; @@ -579,9 +579,7 @@ pub fn wait_for_ddl_commit( } } - if let Some(timeout) = deadline.checked_duration_since(Instant::now()) { - event::wait_timeout(event::Event::EntryApplied, timeout)?; - } else { + if event::wait_deadline(event::Event::EntryApplied, deadline)?.is_timeout() { return Err(Error::Timeout); } } @@ -594,15 +592,13 @@ fn wait_for_no_pending_schema_change( storage: &Clusterwide, timeout: Duration, ) -> traft::Result<()> { - let deadline = instant_saturating_add(Instant::now(), timeout); + let deadline = fiber::clock().saturating_add(timeout); loop { if storage.properties.pending_schema_change()?.is_none() { return Ok(()); } - if let Some(timeout) = deadline.checked_duration_since(Instant::now()) { - event::wait_timeout(event::Event::EntryApplied, timeout)?; - } else { + if event::wait_deadline(event::Event::EntryApplied, deadline)?.is_timeout() { return Err(Error::Timeout); } } diff --git a/src/sync.rs b/src/sync.rs index 4b6d0176e5f470a4ca7a5751f5d000e07aeca544..09c11021c69ade94cbefbc0c0630b442d399088f 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -5,11 +5,10 @@ use ::tarantool::vclock::Vclock; use ::tarantool::{fiber, proc}; use serde::{Deserialize, Serialize}; -use std::time::{Duration, Instant}; +use std::time::Duration; use crate::traft::network::IdOfInstance; use crate::traft::{ConnectionPool, RaftIndex}; -use crate::util::instant_saturating_add; use crate::{rpc, traft}; #[derive(thiserror::Error, Debug)] @@ -75,14 +74,14 @@ fn proc_wait_vclock(target: Vclock, timeout: f64) -> Result<(Vclock,), TimeoutEr /// pub fn wait_vclock(target: Vclock, timeout: Duration) -> Result<Vclock, TimeoutError> { // TODO: this all should be a part of tarantool C API - let deadline = instant_saturating_add(Instant::now(), timeout); + let deadline = fiber::clock().saturating_add(timeout); loop { let current = Vclock::current(); if current >= target { return Ok(current); } - if Instant::now() < deadline { + if fiber::clock() < deadline { fiber::sleep(traft::node::MainLoop::TICK); } else { return Err(TimeoutError); diff --git a/src/traft/event.rs b/src/traft/event.rs index 7c20903c7480175f20a680b137b5bcfa678f2de8..551ceaede1d807208001338ddcd806a32ad09627 100644 --- a/src/traft/event.rs +++ b/src/traft/event.rs @@ -3,10 +3,11 @@ use std::collections::{HashMap, LinkedList}; use std::fmt::Write; use std::rc::Rc; use std::str::FromStr; -use std::time::{Duration, Instant}; +use std::time::Duration; -use ::tarantool::fiber::{mutex::MutexGuard, Cond, Mutex}; +use ::tarantool::fiber::{self, mutex::MutexGuard, Cond, Mutex}; use ::tarantool::proc; +use ::tarantool::time::Instant; use ::tarantool::unwrap_or; use crate::tlog; @@ -93,6 +94,8 @@ impl WaitTimeout { /// Waits for the event to happen or timeout to end. /// /// Returns an error if the `EVENTS` is uninitialized. +/// +/// **This function yields** pub fn wait_timeout(event: Event, timeout: Duration) -> Result<WaitTimeout> { let mut events = events()?; let cond = events.regular_cond(event); @@ -106,19 +109,25 @@ pub fn wait_timeout(event: Event, timeout: Duration) -> Result<WaitTimeout> { } /// Waits for the event to happen or deadline to be reached. +/// Uses the [`fiber::clock`] API to get the current instant. /// /// Returns an error if the `EVENTS` is uninitialized. +/// +/// **This function yields** pub fn wait_deadline(event: Event, deadline: Instant) -> Result<WaitTimeout> { let mut events = events()?; let cond = events.regular_cond(event); // events must be released before yielding drop(events); - let timeout = deadline.saturating_duration_since(Instant::now()); - Ok(if cond.wait_timeout(timeout) { - WaitTimeout::Signal + if let Some(timeout) = deadline.checked_duration_since(fiber::clock()) { + if cond.wait_timeout(timeout) { + Ok(WaitTimeout::Signal) + } else { + Ok(WaitTimeout::Timeout) + } } else { - WaitTimeout::Timeout - }) + Ok(WaitTimeout::Timeout) + } } /// Signals to everybody who's waiting for this `event` either repeated or one diff --git a/src/traft/node.rs b/src/traft/node.rs index 43a6ae95dd771f6b642555dbcb63f0908f483fa2..d4c36872e66dd831e55e935dd6f83563e48437b7 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -38,7 +38,6 @@ use crate::traft::RaftIndex; use crate::traft::RaftSpaceAccess; use crate::traft::RaftTerm; use crate::traft::Topology; -use crate::util::instant_saturating_add; use crate::util::AnyWithTypeName; use crate::warn_or_panic; use ::raft::prelude as raft; @@ -57,6 +56,7 @@ use ::tarantool::index::Part; use ::tarantool::proc; use ::tarantool::space::FieldType as SFT; use ::tarantool::space::SpaceId; +use ::tarantool::time::Instant; use ::tarantool::tlua; use ::tarantool::transaction::transaction; use ::tarantool::tuple::Decode; @@ -69,7 +69,6 @@ use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::rc::Rc; use std::time::Duration; -use std::time::Instant; use ApplyEntryResult::*; type RawNode = raft::RawNode<RaftSpaceAccess>; @@ -233,12 +232,12 @@ impl Node { /// /// **This function yields** pub fn read_index(&self, timeout: Duration) -> traft::Result<RaftIndex> { - let deadline = instant_saturating_add(Instant::now(), timeout); + let deadline = fiber::clock().saturating_add(timeout); let notify = self.raw_operation(|node_impl| node_impl.read_index_async())?; let index: RaftIndex = fiber::block_on(notify.recv_timeout(timeout))?; - self.wait_index(index, deadline.saturating_duration_since(Instant::now())) + self.wait_index(index, deadline.duration_since(fiber::clock())) } /// Waits for [`RaftIndex`] to be applied to the storage locally. @@ -250,16 +249,14 @@ impl Node { /// **This function yields** #[inline] pub fn wait_index(&self, target: RaftIndex, timeout: Duration) -> traft::Result<RaftIndex> { - let deadline = instant_saturating_add(Instant::now(), timeout); + let deadline = fiber::clock().saturating_add(timeout); loop { let current = self.get_index(); if current >= target { return Ok(current); } - if let Some(timeout) = deadline.checked_duration_since(Instant::now()) { - event::wait_timeout(event::Event::EntryApplied, timeout)?; - } else { + if event::wait_deadline(event::Event::EntryApplied, deadline)?.is_timeout() { return Err(Error::Timeout); } } @@ -329,7 +326,7 @@ impl Node { req: rpc::join::Request, timeout: Duration, ) -> traft::Result<(Box<Instance>, HashSet<Address>)> { - let deadline = instant_saturating_add(Instant::now(), timeout); + let deadline = fiber::clock().saturating_add(timeout); loop { let instance = self @@ -365,10 +362,7 @@ impl Node { ($res:expr) => { match $res { Ok((index, term)) => { - self.wait_index( - index, - deadline.saturating_duration_since(Instant::now()), - )?; + self.wait_index(index, deadline.duration_since(fiber::clock()))?; if term != raft::Storage::term(&self.raft_storage, index)? { // leader switched - retry self.wait_status(); @@ -395,7 +389,7 @@ impl Node { term: self.raft_storage.term()?, ranges: ranges.clone(), }, - deadline.saturating_duration_since(Instant::now()), + deadline.duration_since(fiber::clock()), )); handle_result!(cas::compare_and_swap( Op::Dml(op_instance), @@ -404,7 +398,7 @@ impl Node { term: self.raft_storage.term()?, ranges, }, - deadline.saturating_duration_since(Instant::now()), + deadline.duration_since(fiber::clock()), )); self.main_loop.wakeup(); @@ -424,7 +418,7 @@ impl Node { req: rpc::update_instance::Request, timeout: Duration, ) -> traft::Result<()> { - let deadline = instant_saturating_add(Instant::now(), timeout); + let deadline = fiber::clock().saturating_add(timeout); loop { let instance = self @@ -448,11 +442,11 @@ impl Node { term: self.raft_storage.term()?, ranges, }, - deadline.saturating_duration_since(Instant::now()), + deadline.duration_since(fiber::clock()), ); match res { Ok((index, term)) => { - self.wait_index(index, deadline.saturating_duration_since(Instant::now()))?; + self.wait_index(index, deadline.duration_since(fiber::clock()))?; if term != raft::Storage::term(&self.raft_storage, index)? { // leader switched - retry self.wait_status(); @@ -1695,7 +1689,7 @@ impl MainLoop { let now = Instant::now(); if now > state.next_tick { - state.next_tick = instant_saturating_add(now, Self::TICK); + state.next_tick = now.saturating_add(Self::TICK); node_impl.raw_node.tick(); } diff --git a/src/util.rs b/src/util.rs index 6a48009893d11a217dcc73032732c9c4c8be8a59..9d1c1850db7e97988b39bbb50b623780d622d94a 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,22 +1,16 @@ use nix::sys::termios::{tcgetattr, tcsetattr, LocalFlags, SetArg::TCSADRAIN}; + +use std::any::{Any, TypeId}; use std::io::BufRead as _; use std::io::BufReader; use std::io::Write as _; use std::os::unix::io::AsRawFd as _; -pub use Either::{Left, Right}; +use std::time::Duration; use crate::traft::error::Error; +pub use Either::{Left, Right}; -use std::any::{Any, TypeId}; -use std::time::{Duration, Instant}; - -const INFINITY: Duration = Duration::from_secs(30 * 365 * 24 * 60 * 60); - -// TODO: move to tarantool_module when we have custom `Instant` there. -pub fn instant_saturating_add(t: Instant, d: Duration) -> Instant { - t.checked_add(d) - .unwrap_or_else(|| t.checked_add(INFINITY).expect("that's too much, man")) -} +pub const INFINITY: Duration = Duration::from_secs(30 * 365 * 24 * 60 * 60); //////////////////////////////////////////////////////////////////////////////// /// A generic enum that contains exactly one of two possible types. Equivalent