Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • core/picodata
1 result
Show changes
Commits on Source (4)
......@@ -236,6 +236,16 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.16"
......@@ -1592,12 +1602,13 @@ checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60"
[[package]]
name = "tarantool"
version = "1.2.0"
version = "2.0.0"
dependencies = [
"async-trait",
"base64",
"bitflags",
"byteorder",
"crossbeam-queue",
"dec",
"dlopen",
"futures",
......
......@@ -40,7 +40,7 @@ features = ["max_level_trace", "release_max_level_trace"]
[dependencies.tarantool]
path = "./tarantool/tarantool"
version = "1.2.0"
version = "2.0"
features = ["picodata", "test"]
[dev-dependencies]
......
......@@ -14,7 +14,7 @@ RUN set -e; \
RUN set -e; \
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | \
sh -s -- -y --profile default --default-toolchain 1.65.0
sh -s -- -y --profile default --default-toolchain 1.68.2
ENV PATH=/root/.cargo/bin:${PATH}
COPY ci-log-section /usr/bin/ci-log-section
......
......@@ -12,7 +12,6 @@ use crate::traft::Result;
use crate::traft::{EntryContext, EntryContextNormal};
use crate::traft::{RaftIndex, RaftTerm};
use crate::unwrap_ok_or;
use crate::util;
use ::raft::prelude as raft;
use ::raft::Error as RaftError;
......@@ -20,6 +19,7 @@ use ::raft::StorageError;
use tarantool::error::Error as TntError;
use tarantool::fiber;
use tarantool::fiber::r#async::sleep;
use tarantool::fiber::r#async::timeout::IntoTimeout;
use tarantool::space::{Space, SpaceId};
use tarantool::tlua;
......@@ -67,7 +67,7 @@ pub async fn compare_and_swap_async(
Err(e) => {
tlog!(Warning, "failed getting leader address: {e}");
tlog!(Info, "going to retry in a while...");
util::sleep_async(Duration::from_millis(250)).await;
sleep(Duration::from_millis(250)).await;
continue;
}
);
......
use std::borrow::Cow;
use std::collections::{BTreeMap, HashSet};
use std::time::{Duration, Instant};
use std::time::Duration;
use tarantool::fiber;
use tarantool::space::{FieldType, SpaceCreateOptions, SpaceEngineType};
use tarantool::space::{Space, SystemSpace};
use tarantool::transaction::{transaction, TransactionError};
......@@ -24,7 +25,6 @@ use crate::storage::{Clusterwide, ClusterwideSpaceId, PropertyName};
use crate::traft::error::Error;
use crate::traft::op::{Ddl, Op};
use crate::traft::{self, event, node, RaftIndex};
use crate::util::instant_saturating_add;
////////////////////////////////////////////////////////////////////////////////
// SpaceDef
......@@ -561,7 +561,7 @@ pub fn wait_for_ddl_commit(
timeout: Duration,
) -> traft::Result<RaftIndex> {
let raft_storage = &node::global()?.raft_storage;
let deadline = instant_saturating_add(Instant::now(), timeout);
let deadline = fiber::clock().saturating_add(timeout);
let last_seen = prepare_commit;
loop {
let cur_applied = raft_storage.applied()?;
......@@ -579,9 +579,7 @@ pub fn wait_for_ddl_commit(
}
}
if let Some(timeout) = deadline.checked_duration_since(Instant::now()) {
event::wait_timeout(event::Event::EntryApplied, timeout)?;
} else {
if event::wait_deadline(event::Event::EntryApplied, deadline)?.is_timeout() {
return Err(Error::Timeout);
}
}
......@@ -594,15 +592,13 @@ fn wait_for_no_pending_schema_change(
storage: &Clusterwide,
timeout: Duration,
) -> traft::Result<()> {
let deadline = instant_saturating_add(Instant::now(), timeout);
let deadline = fiber::clock().saturating_add(timeout);
loop {
if storage.properties.pending_schema_change()?.is_none() {
return Ok(());
}
if let Some(timeout) = deadline.checked_duration_since(Instant::now()) {
event::wait_timeout(event::Event::EntryApplied, timeout)?;
} else {
if event::wait_deadline(event::Event::EntryApplied, deadline)?.is_timeout() {
return Err(Error::Timeout);
}
}
......
use ::tarantool::error::Error as TntError;
use ::tarantool::fiber;
use ::tarantool::index::{Index, IndexId, IndexIterator, IteratorType};
use ::tarantool::msgpack::{ArrayWriter, ValueIter};
use ::tarantool::space::UpdateOps;
......@@ -2409,6 +2408,8 @@ impl SchemaDef for PrivilegeDef {
////////////////////////////////////////////////////////////////////////////////
pub mod acl {
use tarantool::clock;
use super::*;
////////////////////////////////////////////////////////////////////////////
......@@ -2537,7 +2538,7 @@ pub mod acl {
let auth_map = HashMap::from([(auth.method, &auth.data)]);
let mut ops = UpdateOps::with_capacity(2);
ops.assign(USER_FIELD_AUTH, auth_map)?;
ops.assign(USER_FIELD_LAST_MODIFIED, fiber::time() as u64)?;
ops.assign(USER_FIELD_LAST_MODIFIED, clock::time64())?;
sys_user.update(&[user_id], ops)?;
Ok(())
}
......
......@@ -5,11 +5,10 @@ use ::tarantool::vclock::Vclock;
use ::tarantool::{fiber, proc};
use serde::{Deserialize, Serialize};
use std::time::{Duration, Instant};
use std::time::Duration;
use crate::traft::network::IdOfInstance;
use crate::traft::{ConnectionPool, RaftIndex};
use crate::util::instant_saturating_add;
use crate::{rpc, traft};
#[derive(thiserror::Error, Debug)]
......@@ -75,14 +74,14 @@ fn proc_wait_vclock(target: Vclock, timeout: f64) -> Result<(Vclock,), TimeoutEr
///
pub fn wait_vclock(target: Vclock, timeout: Duration) -> Result<Vclock, TimeoutError> {
// TODO: this all should be a part of tarantool C API
let deadline = instant_saturating_add(Instant::now(), timeout);
let deadline = fiber::clock().saturating_add(timeout);
loop {
let current = Vclock::current();
if current >= target {
return Ok(current);
}
if Instant::now() < deadline {
if fiber::clock() < deadline {
fiber::sleep(traft::node::MainLoop::TICK);
} else {
return Err(TimeoutError);
......
......@@ -3,10 +3,11 @@ use std::collections::{HashMap, LinkedList};
use std::fmt::Write;
use std::rc::Rc;
use std::str::FromStr;
use std::time::{Duration, Instant};
use std::time::Duration;
use ::tarantool::fiber::{mutex::MutexGuard, Cond, Mutex};
use ::tarantool::fiber::{self, mutex::MutexGuard, Cond, Mutex};
use ::tarantool::proc;
use ::tarantool::time::Instant;
use ::tarantool::unwrap_or;
use crate::tlog;
......@@ -93,6 +94,8 @@ impl WaitTimeout {
/// Waits for the event to happen or timeout to end.
///
/// Returns an error if the `EVENTS` is uninitialized.
///
/// **This function yields**
pub fn wait_timeout(event: Event, timeout: Duration) -> Result<WaitTimeout> {
let mut events = events()?;
let cond = events.regular_cond(event);
......@@ -106,19 +109,25 @@ pub fn wait_timeout(event: Event, timeout: Duration) -> Result<WaitTimeout> {
}
/// Waits for the event to happen or deadline to be reached.
/// Uses the [`fiber::clock`] API to get the current instant.
///
/// Returns an error if the `EVENTS` is uninitialized.
///
/// **This function yields**
pub fn wait_deadline(event: Event, deadline: Instant) -> Result<WaitTimeout> {
let mut events = events()?;
let cond = events.regular_cond(event);
// events must be released before yielding
drop(events);
let timeout = deadline.saturating_duration_since(Instant::now());
Ok(if cond.wait_timeout(timeout) {
WaitTimeout::Signal
if let Some(timeout) = deadline.checked_duration_since(fiber::clock()) {
if cond.wait_timeout(timeout) {
Ok(WaitTimeout::Signal)
} else {
Ok(WaitTimeout::Timeout)
}
} else {
WaitTimeout::Timeout
})
Ok(WaitTimeout::Timeout)
}
}
/// Signals to everybody who's waiting for this `event` either repeated or one
......
......@@ -38,7 +38,6 @@ use crate::traft::RaftIndex;
use crate::traft::RaftSpaceAccess;
use crate::traft::RaftTerm;
use crate::traft::Topology;
use crate::util::instant_saturating_add;
use crate::util::AnyWithTypeName;
use crate::warn_or_panic;
use ::raft::prelude as raft;
......@@ -57,6 +56,7 @@ use ::tarantool::index::Part;
use ::tarantool::proc;
use ::tarantool::space::FieldType as SFT;
use ::tarantool::space::SpaceId;
use ::tarantool::time::Instant;
use ::tarantool::tlua;
use ::tarantool::transaction::transaction;
use ::tarantool::tuple::Decode;
......@@ -69,7 +69,6 @@ use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::rc::Rc;
use std::time::Duration;
use std::time::Instant;
use ApplyEntryResult::*;
type RawNode = raft::RawNode<RaftSpaceAccess>;
......@@ -233,12 +232,12 @@ impl Node {
///
/// **This function yields**
pub fn read_index(&self, timeout: Duration) -> traft::Result<RaftIndex> {
let deadline = instant_saturating_add(Instant::now(), timeout);
let deadline = fiber::clock().saturating_add(timeout);
let notify = self.raw_operation(|node_impl| node_impl.read_index_async())?;
let index: RaftIndex = fiber::block_on(notify.recv_timeout(timeout))?;
self.wait_index(index, deadline.saturating_duration_since(Instant::now()))
self.wait_index(index, deadline.duration_since(fiber::clock()))
}
/// Waits for [`RaftIndex`] to be applied to the storage locally.
......@@ -250,16 +249,14 @@ impl Node {
/// **This function yields**
#[inline]
pub fn wait_index(&self, target: RaftIndex, timeout: Duration) -> traft::Result<RaftIndex> {
let deadline = instant_saturating_add(Instant::now(), timeout);
let deadline = fiber::clock().saturating_add(timeout);
loop {
let current = self.get_index();
if current >= target {
return Ok(current);
}
if let Some(timeout) = deadline.checked_duration_since(Instant::now()) {
event::wait_timeout(event::Event::EntryApplied, timeout)?;
} else {
if event::wait_deadline(event::Event::EntryApplied, deadline)?.is_timeout() {
return Err(Error::Timeout);
}
}
......@@ -329,7 +326,7 @@ impl Node {
req: rpc::join::Request,
timeout: Duration,
) -> traft::Result<(Box<Instance>, HashSet<Address>)> {
let deadline = instant_saturating_add(Instant::now(), timeout);
let deadline = fiber::clock().saturating_add(timeout);
loop {
let instance = self
......@@ -365,10 +362,7 @@ impl Node {
($res:expr) => {
match $res {
Ok((index, term)) => {
self.wait_index(
index,
deadline.saturating_duration_since(Instant::now()),
)?;
self.wait_index(index, deadline.duration_since(fiber::clock()))?;
if term != raft::Storage::term(&self.raft_storage, index)? {
// leader switched - retry
self.wait_status();
......@@ -395,7 +389,7 @@ impl Node {
term: self.raft_storage.term()?,
ranges: ranges.clone(),
},
deadline.saturating_duration_since(Instant::now()),
deadline.duration_since(fiber::clock()),
));
handle_result!(cas::compare_and_swap(
Op::Dml(op_instance),
......@@ -404,7 +398,7 @@ impl Node {
term: self.raft_storage.term()?,
ranges,
},
deadline.saturating_duration_since(Instant::now()),
deadline.duration_since(fiber::clock()),
));
self.main_loop.wakeup();
......@@ -424,7 +418,7 @@ impl Node {
req: rpc::update_instance::Request,
timeout: Duration,
) -> traft::Result<()> {
let deadline = instant_saturating_add(Instant::now(), timeout);
let deadline = fiber::clock().saturating_add(timeout);
loop {
let instance = self
......@@ -448,11 +442,11 @@ impl Node {
term: self.raft_storage.term()?,
ranges,
},
deadline.saturating_duration_since(Instant::now()),
deadline.duration_since(fiber::clock()),
);
match res {
Ok((index, term)) => {
self.wait_index(index, deadline.saturating_duration_since(Instant::now()))?;
self.wait_index(index, deadline.duration_since(fiber::clock()))?;
if term != raft::Storage::term(&self.raft_storage, index)? {
// leader switched - retry
self.wait_status();
......@@ -1695,7 +1689,7 @@ impl MainLoop {
let now = Instant::now();
if now > state.next_tick {
state.next_tick = instant_saturating_add(now, Self::TICK);
state.next_tick = now.saturating_add(Self::TICK);
node_impl.raw_node.tick();
}
......
use nix::sys::termios::{tcgetattr, tcsetattr, LocalFlags, SetArg::TCSADRAIN};
use std::any::{Any, TypeId};
use std::io::BufRead as _;
use std::io::BufReader;
use std::io::Write as _;
use std::os::unix::io::AsRawFd as _;
use tarantool::fiber;
use tarantool::fiber::r#async::timeout::IntoTimeout;
pub use Either::{Left, Right};
use std::time::Duration;
use crate::traft::error::Error;
pub use Either::{Left, Right};
use std::any::{Any, TypeId};
use std::time::{Duration, Instant};
const INFINITY: Duration = Duration::from_secs(30 * 365 * 24 * 60 * 60);
// TODO: move to tarantool_module when we have custom `Instant` there.
pub fn instant_saturating_add(t: Instant, d: Duration) -> Instant {
t.checked_add(d)
.unwrap_or_else(|| t.checked_add(INFINITY).expect("that's too much, man"))
}
// TODO: move to tarantool_module
pub async fn sleep_async(time: Duration) {
let (tx, rx) = fiber::r#async::oneshot::channel::<()>();
rx.timeout(time).await.unwrap_err();
drop(tx);
}
pub const INFINITY: Duration = Duration::from_secs(30 * 365 * 24 * 60 * 60);
////////////////////////////////////////////////////////////////////////////////
/// A generic enum that contains exactly one of two possible types. Equivalent
......@@ -562,16 +547,3 @@ mod tests {
);
}
}
mod tarantool_tests {
use std::time::Duration;
use ::tarantool::fiber;
#[::tarantool::test]
fn sleep_wakes_up() {
let should_yield =
fiber::check_yield(|| fiber::block_on(super::sleep_async(Duration::from_millis(10))));
assert_eq!(should_yield, fiber::YieldResult::Yielded(()));
}
}
Subproject commit 1b5a17f26e35737eb186acf295e79a16db87c118
Subproject commit f2c74995a4471c30fe8a4f3388c1dc30532cf5f8