Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • core/picodata
1 result
Show changes
Commits on Source (4)
...@@ -236,6 +236,16 @@ dependencies = [ ...@@ -236,6 +236,16 @@ dependencies = [
"crossbeam-utils", "crossbeam-utils",
] ]
[[package]]
name = "crossbeam-queue"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
]
[[package]] [[package]]
name = "crossbeam-utils" name = "crossbeam-utils"
version = "0.8.16" version = "0.8.16"
...@@ -1592,12 +1602,13 @@ checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" ...@@ -1592,12 +1602,13 @@ checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60"
[[package]] [[package]]
name = "tarantool" name = "tarantool"
version = "1.2.0" version = "2.0.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"base64", "base64",
"bitflags", "bitflags",
"byteorder", "byteorder",
"crossbeam-queue",
"dec", "dec",
"dlopen", "dlopen",
"futures", "futures",
......
...@@ -40,7 +40,7 @@ features = ["max_level_trace", "release_max_level_trace"] ...@@ -40,7 +40,7 @@ features = ["max_level_trace", "release_max_level_trace"]
[dependencies.tarantool] [dependencies.tarantool]
path = "./tarantool/tarantool" path = "./tarantool/tarantool"
version = "1.2.0" version = "2.0"
features = ["picodata", "test"] features = ["picodata", "test"]
[dev-dependencies] [dev-dependencies]
......
...@@ -14,7 +14,7 @@ RUN set -e; \ ...@@ -14,7 +14,7 @@ RUN set -e; \
RUN set -e; \ RUN set -e; \
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | \ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | \
sh -s -- -y --profile default --default-toolchain 1.65.0 sh -s -- -y --profile default --default-toolchain 1.68.2
ENV PATH=/root/.cargo/bin:${PATH} ENV PATH=/root/.cargo/bin:${PATH}
COPY ci-log-section /usr/bin/ci-log-section COPY ci-log-section /usr/bin/ci-log-section
......
...@@ -12,7 +12,6 @@ use crate::traft::Result; ...@@ -12,7 +12,6 @@ use crate::traft::Result;
use crate::traft::{EntryContext, EntryContextNormal}; use crate::traft::{EntryContext, EntryContextNormal};
use crate::traft::{RaftIndex, RaftTerm}; use crate::traft::{RaftIndex, RaftTerm};
use crate::unwrap_ok_or; use crate::unwrap_ok_or;
use crate::util;
use ::raft::prelude as raft; use ::raft::prelude as raft;
use ::raft::Error as RaftError; use ::raft::Error as RaftError;
...@@ -20,6 +19,7 @@ use ::raft::StorageError; ...@@ -20,6 +19,7 @@ use ::raft::StorageError;
use tarantool::error::Error as TntError; use tarantool::error::Error as TntError;
use tarantool::fiber; use tarantool::fiber;
use tarantool::fiber::r#async::sleep;
use tarantool::fiber::r#async::timeout::IntoTimeout; use tarantool::fiber::r#async::timeout::IntoTimeout;
use tarantool::space::{Space, SpaceId}; use tarantool::space::{Space, SpaceId};
use tarantool::tlua; use tarantool::tlua;
...@@ -67,7 +67,7 @@ pub async fn compare_and_swap_async( ...@@ -67,7 +67,7 @@ pub async fn compare_and_swap_async(
Err(e) => { Err(e) => {
tlog!(Warning, "failed getting leader address: {e}"); tlog!(Warning, "failed getting leader address: {e}");
tlog!(Info, "going to retry in a while..."); tlog!(Info, "going to retry in a while...");
util::sleep_async(Duration::from_millis(250)).await; sleep(Duration::from_millis(250)).await;
continue; continue;
} }
); );
......
use std::borrow::Cow; use std::borrow::Cow;
use std::collections::{BTreeMap, HashSet}; use std::collections::{BTreeMap, HashSet};
use std::time::{Duration, Instant}; use std::time::Duration;
use tarantool::fiber;
use tarantool::space::{FieldType, SpaceCreateOptions, SpaceEngineType}; use tarantool::space::{FieldType, SpaceCreateOptions, SpaceEngineType};
use tarantool::space::{Space, SystemSpace}; use tarantool::space::{Space, SystemSpace};
use tarantool::transaction::{transaction, TransactionError}; use tarantool::transaction::{transaction, TransactionError};
...@@ -24,7 +25,6 @@ use crate::storage::{Clusterwide, ClusterwideSpaceId, PropertyName}; ...@@ -24,7 +25,6 @@ use crate::storage::{Clusterwide, ClusterwideSpaceId, PropertyName};
use crate::traft::error::Error; use crate::traft::error::Error;
use crate::traft::op::{Ddl, Op}; use crate::traft::op::{Ddl, Op};
use crate::traft::{self, event, node, RaftIndex}; use crate::traft::{self, event, node, RaftIndex};
use crate::util::instant_saturating_add;
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// SpaceDef // SpaceDef
...@@ -561,7 +561,7 @@ pub fn wait_for_ddl_commit( ...@@ -561,7 +561,7 @@ pub fn wait_for_ddl_commit(
timeout: Duration, timeout: Duration,
) -> traft::Result<RaftIndex> { ) -> traft::Result<RaftIndex> {
let raft_storage = &node::global()?.raft_storage; let raft_storage = &node::global()?.raft_storage;
let deadline = instant_saturating_add(Instant::now(), timeout); let deadline = fiber::clock().saturating_add(timeout);
let last_seen = prepare_commit; let last_seen = prepare_commit;
loop { loop {
let cur_applied = raft_storage.applied()?; let cur_applied = raft_storage.applied()?;
...@@ -579,9 +579,7 @@ pub fn wait_for_ddl_commit( ...@@ -579,9 +579,7 @@ pub fn wait_for_ddl_commit(
} }
} }
if let Some(timeout) = deadline.checked_duration_since(Instant::now()) { if event::wait_deadline(event::Event::EntryApplied, deadline)?.is_timeout() {
event::wait_timeout(event::Event::EntryApplied, timeout)?;
} else {
return Err(Error::Timeout); return Err(Error::Timeout);
} }
} }
...@@ -594,15 +592,13 @@ fn wait_for_no_pending_schema_change( ...@@ -594,15 +592,13 @@ fn wait_for_no_pending_schema_change(
storage: &Clusterwide, storage: &Clusterwide,
timeout: Duration, timeout: Duration,
) -> traft::Result<()> { ) -> traft::Result<()> {
let deadline = instant_saturating_add(Instant::now(), timeout); let deadline = fiber::clock().saturating_add(timeout);
loop { loop {
if storage.properties.pending_schema_change()?.is_none() { if storage.properties.pending_schema_change()?.is_none() {
return Ok(()); return Ok(());
} }
if let Some(timeout) = deadline.checked_duration_since(Instant::now()) { if event::wait_deadline(event::Event::EntryApplied, deadline)?.is_timeout() {
event::wait_timeout(event::Event::EntryApplied, timeout)?;
} else {
return Err(Error::Timeout); return Err(Error::Timeout);
} }
} }
......
use ::tarantool::error::Error as TntError; use ::tarantool::error::Error as TntError;
use ::tarantool::fiber;
use ::tarantool::index::{Index, IndexId, IndexIterator, IteratorType}; use ::tarantool::index::{Index, IndexId, IndexIterator, IteratorType};
use ::tarantool::msgpack::{ArrayWriter, ValueIter}; use ::tarantool::msgpack::{ArrayWriter, ValueIter};
use ::tarantool::space::UpdateOps; use ::tarantool::space::UpdateOps;
...@@ -2409,6 +2408,8 @@ impl SchemaDef for PrivilegeDef { ...@@ -2409,6 +2408,8 @@ impl SchemaDef for PrivilegeDef {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
pub mod acl { pub mod acl {
use tarantool::clock;
use super::*; use super::*;
//////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////
...@@ -2537,7 +2538,7 @@ pub mod acl { ...@@ -2537,7 +2538,7 @@ pub mod acl {
let auth_map = HashMap::from([(auth.method, &auth.data)]); let auth_map = HashMap::from([(auth.method, &auth.data)]);
let mut ops = UpdateOps::with_capacity(2); let mut ops = UpdateOps::with_capacity(2);
ops.assign(USER_FIELD_AUTH, auth_map)?; ops.assign(USER_FIELD_AUTH, auth_map)?;
ops.assign(USER_FIELD_LAST_MODIFIED, fiber::time() as u64)?; ops.assign(USER_FIELD_LAST_MODIFIED, clock::time64())?;
sys_user.update(&[user_id], ops)?; sys_user.update(&[user_id], ops)?;
Ok(()) Ok(())
} }
......
...@@ -5,11 +5,10 @@ use ::tarantool::vclock::Vclock; ...@@ -5,11 +5,10 @@ use ::tarantool::vclock::Vclock;
use ::tarantool::{fiber, proc}; use ::tarantool::{fiber, proc};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::time::{Duration, Instant}; use std::time::Duration;
use crate::traft::network::IdOfInstance; use crate::traft::network::IdOfInstance;
use crate::traft::{ConnectionPool, RaftIndex}; use crate::traft::{ConnectionPool, RaftIndex};
use crate::util::instant_saturating_add;
use crate::{rpc, traft}; use crate::{rpc, traft};
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
...@@ -75,14 +74,14 @@ fn proc_wait_vclock(target: Vclock, timeout: f64) -> Result<(Vclock,), TimeoutEr ...@@ -75,14 +74,14 @@ fn proc_wait_vclock(target: Vclock, timeout: f64) -> Result<(Vclock,), TimeoutEr
/// ///
pub fn wait_vclock(target: Vclock, timeout: Duration) -> Result<Vclock, TimeoutError> { pub fn wait_vclock(target: Vclock, timeout: Duration) -> Result<Vclock, TimeoutError> {
// TODO: this all should be a part of tarantool C API // TODO: this all should be a part of tarantool C API
let deadline = instant_saturating_add(Instant::now(), timeout); let deadline = fiber::clock().saturating_add(timeout);
loop { loop {
let current = Vclock::current(); let current = Vclock::current();
if current >= target { if current >= target {
return Ok(current); return Ok(current);
} }
if Instant::now() < deadline { if fiber::clock() < deadline {
fiber::sleep(traft::node::MainLoop::TICK); fiber::sleep(traft::node::MainLoop::TICK);
} else { } else {
return Err(TimeoutError); return Err(TimeoutError);
......
...@@ -3,10 +3,11 @@ use std::collections::{HashMap, LinkedList}; ...@@ -3,10 +3,11 @@ use std::collections::{HashMap, LinkedList};
use std::fmt::Write; use std::fmt::Write;
use std::rc::Rc; use std::rc::Rc;
use std::str::FromStr; use std::str::FromStr;
use std::time::{Duration, Instant}; use std::time::Duration;
use ::tarantool::fiber::{mutex::MutexGuard, Cond, Mutex}; use ::tarantool::fiber::{self, mutex::MutexGuard, Cond, Mutex};
use ::tarantool::proc; use ::tarantool::proc;
use ::tarantool::time::Instant;
use ::tarantool::unwrap_or; use ::tarantool::unwrap_or;
use crate::tlog; use crate::tlog;
...@@ -93,6 +94,8 @@ impl WaitTimeout { ...@@ -93,6 +94,8 @@ impl WaitTimeout {
/// Waits for the event to happen or timeout to end. /// Waits for the event to happen or timeout to end.
/// ///
/// Returns an error if the `EVENTS` is uninitialized. /// Returns an error if the `EVENTS` is uninitialized.
///
/// **This function yields**
pub fn wait_timeout(event: Event, timeout: Duration) -> Result<WaitTimeout> { pub fn wait_timeout(event: Event, timeout: Duration) -> Result<WaitTimeout> {
let mut events = events()?; let mut events = events()?;
let cond = events.regular_cond(event); let cond = events.regular_cond(event);
...@@ -106,19 +109,25 @@ pub fn wait_timeout(event: Event, timeout: Duration) -> Result<WaitTimeout> { ...@@ -106,19 +109,25 @@ pub fn wait_timeout(event: Event, timeout: Duration) -> Result<WaitTimeout> {
} }
/// Waits for the event to happen or deadline to be reached. /// Waits for the event to happen or deadline to be reached.
/// Uses the [`fiber::clock`] API to get the current instant.
/// ///
/// Returns an error if the `EVENTS` is uninitialized. /// Returns an error if the `EVENTS` is uninitialized.
///
/// **This function yields**
pub fn wait_deadline(event: Event, deadline: Instant) -> Result<WaitTimeout> { pub fn wait_deadline(event: Event, deadline: Instant) -> Result<WaitTimeout> {
let mut events = events()?; let mut events = events()?;
let cond = events.regular_cond(event); let cond = events.regular_cond(event);
// events must be released before yielding // events must be released before yielding
drop(events); drop(events);
let timeout = deadline.saturating_duration_since(Instant::now()); if let Some(timeout) = deadline.checked_duration_since(fiber::clock()) {
Ok(if cond.wait_timeout(timeout) { if cond.wait_timeout(timeout) {
WaitTimeout::Signal Ok(WaitTimeout::Signal)
} else {
Ok(WaitTimeout::Timeout)
}
} else { } else {
WaitTimeout::Timeout Ok(WaitTimeout::Timeout)
}) }
} }
/// Signals to everybody who's waiting for this `event` either repeated or one /// Signals to everybody who's waiting for this `event` either repeated or one
......
...@@ -38,7 +38,6 @@ use crate::traft::RaftIndex; ...@@ -38,7 +38,6 @@ use crate::traft::RaftIndex;
use crate::traft::RaftSpaceAccess; use crate::traft::RaftSpaceAccess;
use crate::traft::RaftTerm; use crate::traft::RaftTerm;
use crate::traft::Topology; use crate::traft::Topology;
use crate::util::instant_saturating_add;
use crate::util::AnyWithTypeName; use crate::util::AnyWithTypeName;
use crate::warn_or_panic; use crate::warn_or_panic;
use ::raft::prelude as raft; use ::raft::prelude as raft;
...@@ -57,6 +56,7 @@ use ::tarantool::index::Part; ...@@ -57,6 +56,7 @@ use ::tarantool::index::Part;
use ::tarantool::proc; use ::tarantool::proc;
use ::tarantool::space::FieldType as SFT; use ::tarantool::space::FieldType as SFT;
use ::tarantool::space::SpaceId; use ::tarantool::space::SpaceId;
use ::tarantool::time::Instant;
use ::tarantool::tlua; use ::tarantool::tlua;
use ::tarantool::transaction::transaction; use ::tarantool::transaction::transaction;
use ::tarantool::tuple::Decode; use ::tarantool::tuple::Decode;
...@@ -69,7 +69,6 @@ use std::collections::{HashMap, HashSet}; ...@@ -69,7 +69,6 @@ use std::collections::{HashMap, HashSet};
use std::convert::TryFrom; use std::convert::TryFrom;
use std::rc::Rc; use std::rc::Rc;
use std::time::Duration; use std::time::Duration;
use std::time::Instant;
use ApplyEntryResult::*; use ApplyEntryResult::*;
type RawNode = raft::RawNode<RaftSpaceAccess>; type RawNode = raft::RawNode<RaftSpaceAccess>;
...@@ -233,12 +232,12 @@ impl Node { ...@@ -233,12 +232,12 @@ impl Node {
/// ///
/// **This function yields** /// **This function yields**
pub fn read_index(&self, timeout: Duration) -> traft::Result<RaftIndex> { pub fn read_index(&self, timeout: Duration) -> traft::Result<RaftIndex> {
let deadline = instant_saturating_add(Instant::now(), timeout); let deadline = fiber::clock().saturating_add(timeout);
let notify = self.raw_operation(|node_impl| node_impl.read_index_async())?; let notify = self.raw_operation(|node_impl| node_impl.read_index_async())?;
let index: RaftIndex = fiber::block_on(notify.recv_timeout(timeout))?; let index: RaftIndex = fiber::block_on(notify.recv_timeout(timeout))?;
self.wait_index(index, deadline.saturating_duration_since(Instant::now())) self.wait_index(index, deadline.duration_since(fiber::clock()))
} }
/// Waits for [`RaftIndex`] to be applied to the storage locally. /// Waits for [`RaftIndex`] to be applied to the storage locally.
...@@ -250,16 +249,14 @@ impl Node { ...@@ -250,16 +249,14 @@ impl Node {
/// **This function yields** /// **This function yields**
#[inline] #[inline]
pub fn wait_index(&self, target: RaftIndex, timeout: Duration) -> traft::Result<RaftIndex> { pub fn wait_index(&self, target: RaftIndex, timeout: Duration) -> traft::Result<RaftIndex> {
let deadline = instant_saturating_add(Instant::now(), timeout); let deadline = fiber::clock().saturating_add(timeout);
loop { loop {
let current = self.get_index(); let current = self.get_index();
if current >= target { if current >= target {
return Ok(current); return Ok(current);
} }
if let Some(timeout) = deadline.checked_duration_since(Instant::now()) { if event::wait_deadline(event::Event::EntryApplied, deadline)?.is_timeout() {
event::wait_timeout(event::Event::EntryApplied, timeout)?;
} else {
return Err(Error::Timeout); return Err(Error::Timeout);
} }
} }
...@@ -329,7 +326,7 @@ impl Node { ...@@ -329,7 +326,7 @@ impl Node {
req: rpc::join::Request, req: rpc::join::Request,
timeout: Duration, timeout: Duration,
) -> traft::Result<(Box<Instance>, HashSet<Address>)> { ) -> traft::Result<(Box<Instance>, HashSet<Address>)> {
let deadline = instant_saturating_add(Instant::now(), timeout); let deadline = fiber::clock().saturating_add(timeout);
loop { loop {
let instance = self let instance = self
...@@ -365,10 +362,7 @@ impl Node { ...@@ -365,10 +362,7 @@ impl Node {
($res:expr) => { ($res:expr) => {
match $res { match $res {
Ok((index, term)) => { Ok((index, term)) => {
self.wait_index( self.wait_index(index, deadline.duration_since(fiber::clock()))?;
index,
deadline.saturating_duration_since(Instant::now()),
)?;
if term != raft::Storage::term(&self.raft_storage, index)? { if term != raft::Storage::term(&self.raft_storage, index)? {
// leader switched - retry // leader switched - retry
self.wait_status(); self.wait_status();
...@@ -395,7 +389,7 @@ impl Node { ...@@ -395,7 +389,7 @@ impl Node {
term: self.raft_storage.term()?, term: self.raft_storage.term()?,
ranges: ranges.clone(), ranges: ranges.clone(),
}, },
deadline.saturating_duration_since(Instant::now()), deadline.duration_since(fiber::clock()),
)); ));
handle_result!(cas::compare_and_swap( handle_result!(cas::compare_and_swap(
Op::Dml(op_instance), Op::Dml(op_instance),
...@@ -404,7 +398,7 @@ impl Node { ...@@ -404,7 +398,7 @@ impl Node {
term: self.raft_storage.term()?, term: self.raft_storage.term()?,
ranges, ranges,
}, },
deadline.saturating_duration_since(Instant::now()), deadline.duration_since(fiber::clock()),
)); ));
self.main_loop.wakeup(); self.main_loop.wakeup();
...@@ -424,7 +418,7 @@ impl Node { ...@@ -424,7 +418,7 @@ impl Node {
req: rpc::update_instance::Request, req: rpc::update_instance::Request,
timeout: Duration, timeout: Duration,
) -> traft::Result<()> { ) -> traft::Result<()> {
let deadline = instant_saturating_add(Instant::now(), timeout); let deadline = fiber::clock().saturating_add(timeout);
loop { loop {
let instance = self let instance = self
...@@ -448,11 +442,11 @@ impl Node { ...@@ -448,11 +442,11 @@ impl Node {
term: self.raft_storage.term()?, term: self.raft_storage.term()?,
ranges, ranges,
}, },
deadline.saturating_duration_since(Instant::now()), deadline.duration_since(fiber::clock()),
); );
match res { match res {
Ok((index, term)) => { Ok((index, term)) => {
self.wait_index(index, deadline.saturating_duration_since(Instant::now()))?; self.wait_index(index, deadline.duration_since(fiber::clock()))?;
if term != raft::Storage::term(&self.raft_storage, index)? { if term != raft::Storage::term(&self.raft_storage, index)? {
// leader switched - retry // leader switched - retry
self.wait_status(); self.wait_status();
...@@ -1695,7 +1689,7 @@ impl MainLoop { ...@@ -1695,7 +1689,7 @@ impl MainLoop {
let now = Instant::now(); let now = Instant::now();
if now > state.next_tick { if now > state.next_tick {
state.next_tick = instant_saturating_add(now, Self::TICK); state.next_tick = now.saturating_add(Self::TICK);
node_impl.raw_node.tick(); node_impl.raw_node.tick();
} }
......
use nix::sys::termios::{tcgetattr, tcsetattr, LocalFlags, SetArg::TCSADRAIN}; use nix::sys::termios::{tcgetattr, tcsetattr, LocalFlags, SetArg::TCSADRAIN};
use std::any::{Any, TypeId};
use std::io::BufRead as _; use std::io::BufRead as _;
use std::io::BufReader; use std::io::BufReader;
use std::io::Write as _; use std::io::Write as _;
use std::os::unix::io::AsRawFd as _; use std::os::unix::io::AsRawFd as _;
use tarantool::fiber; use std::time::Duration;
use tarantool::fiber::r#async::timeout::IntoTimeout;
pub use Either::{Left, Right};
use crate::traft::error::Error; use crate::traft::error::Error;
pub use Either::{Left, Right};
use std::any::{Any, TypeId}; pub const INFINITY: Duration = Duration::from_secs(30 * 365 * 24 * 60 * 60);
use std::time::{Duration, Instant};
const INFINITY: Duration = Duration::from_secs(30 * 365 * 24 * 60 * 60);
// TODO: move to tarantool_module when we have custom `Instant` there.
pub fn instant_saturating_add(t: Instant, d: Duration) -> Instant {
t.checked_add(d)
.unwrap_or_else(|| t.checked_add(INFINITY).expect("that's too much, man"))
}
// TODO: move to tarantool_module
pub async fn sleep_async(time: Duration) {
let (tx, rx) = fiber::r#async::oneshot::channel::<()>();
rx.timeout(time).await.unwrap_err();
drop(tx);
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// A generic enum that contains exactly one of two possible types. Equivalent /// A generic enum that contains exactly one of two possible types. Equivalent
...@@ -562,16 +547,3 @@ mod tests { ...@@ -562,16 +547,3 @@ mod tests {
); );
} }
} }
mod tarantool_tests {
use std::time::Duration;
use ::tarantool::fiber;
#[::tarantool::test]
fn sleep_wakes_up() {
let should_yield =
fiber::check_yield(|| fiber::block_on(super::sleep_async(Duration::from_millis(10))));
assert_eq!(should_yield, fiber::YieldResult::Yielded(()));
}
}
Subproject commit 1b5a17f26e35737eb186acf295e79a16db87c118 Subproject commit f2c74995a4471c30fe8a4f3388c1dc30532cf5f8