Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • core/tarantool-module
1 result
Show changes
Commits on Source (16)
Showing
with 1287 additions and 136 deletions
......@@ -7,14 +7,25 @@
log::Level to SayLevel taking the mapping function into account.
- `tlua::Push` and `tlua::LuaRead` implementations for `SayLevel`.
- `examples/luaopen` example of how to implement native lua modules.
- `tarantool::cbus` module for communication between any arbitrary thread and
tarantool thread via syncronization primitives (channels) and low-level cbus api.
- `tarantool::time::Instant` a custom implementation of std-like `Instant` with more saturating operations
and support of the `fiber_clock` API.
- `r#async::sleep` - an async friendly analog of `fiber::sleep`.
### Fixed
- `log::Log::enabled` implementation for TarantoolLogger no longer ignores the
mapping provided at construction.
- A copy of fiber name used to leak in `Fiber::new` and `Fiber::new_with_attr`.
- `tarantool::decimal` api is now thread safe, which allows it to be used in concurrent threads.
### Breaking Changes
- `transaction::start_transaction` has a more flexible error handling,
and is renamed to `transaction::transaction`
- `fiber::clock` now returns `tarantool::time::Instant`
- `fiber::time` and `fiber::time64` returning non-monotonic time removed. If
calendar time is needed, use `std::time::SystemTime`.
- `fiber::clock64` removed in favor of a new `Instant` based `fiber::clock` API
# [1.1.0] June 16 2023
......
......@@ -57,7 +57,7 @@ rustflags = [
Add the following lines to your project's Cargo.toml:
```toml
[dependencies]
tarantool = "1.1"
tarantool = "2.0"
[lib]
crate-type = ["cdylib"]
......@@ -104,7 +104,7 @@ edition = "2018"
# author, license, etc
[dependencies]
tarantool = "1.1"
tarantool = "2.0"
serde = "1.0"
[lib]
......@@ -357,7 +357,7 @@ use tarantool::{
error::Error,
fiber::sleep,
space::Space,
transaction::start_transaction,
transaction::transaction,
};
#[proc]
......@@ -367,7 +367,7 @@ fn write() -> Result<(i32, String), String> {
let row = (1, "22".to_string());
start_transaction(|| -> Result<(), Error> {
transaction(|| -> Result<(), Error> {
space.replace(&row)?;
Ok(())
})
......
......@@ -10,7 +10,7 @@ hyper = { version = "0.14", features = ["full"] }
tokio = { version = "1", features = ["full"] }
futures-util = "*"
http-body-util = "0.1.0-rc.2"
env_logger = "*"
env_logger = "0.9.0"
bytes = "*"
serde = "*"
serde_json = "*"
......
use std::{
future::Future,
time::{Duration, Instant},
};
use std::{future::Future, time::Duration};
use tarantool::{
fiber,
net_box::{Conn, ConnOptions, Options},
network::client::{AsClient as _, Client},
proc,
time::Instant,
};
const N_ITERS: usize = 100_000;
......
[package]
name = "tarantool"
description = "Tarantool rust bindings"
version = "1.2.0"
version = "2.0.0"
authors = [
"Dmitriy Koltsov <dkoltsov@picodata.io>",
"Georgy Moshkin <gmoshkin@picodata.io>",
......@@ -32,9 +32,9 @@ num-derive = "0.3"
once_cell = "1.4.0"
tlua = { path = "../tlua", version = "1.0.0" }
refpool = { version = "0.4.3", optional = true }
rmp = "0.8"
rmp = "=0.8.11"
rmp-serde = "=1.0.0"
rmpv = { version = "^1.0", features = ["with-serde"] }
rmpv = { version = "=1.0.0", features = ["with-serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_bytes = "^0"
......@@ -46,6 +46,7 @@ linkme = "0.2.10"
async-trait = "0.1.64"
tester = { version = "0.7.0", optional = true }
time = ">=0.3.0, <0.3.18"
crossbeam-queue = "0.3.8"
[target.'cfg(not(all(target_arch = "aarch64", target_os = "macos")))'.dependencies]
va_list = "0.1.3"
......
#![cfg(any(feature = "picodata", doc))]
//! Tarantool cbus integration and channels.
//!
//! Original cbus provides a means of communication between separate threads:
//! - cpipe - channel between two cords. Cord is a separate thread with `libev` event loop inside it.
//! - lcpipe - channel between any arbitrary thread and cord.
//!
//! For the purposes of communication tx thread (where code of stored proc is working) and external threads
//! we should use a `lcpipe`.
//!
//! ## lcpipe schema
//!
//! Let's see how `lcpipe` woks, there are a number of entities participating in the exchange of messages:
//! - message - a unit of communication, may have a user defined payload
//! - message hop - defines how message will be handled on consumer side
//! - endpoint - message consumer, identified by name. Any endpoint occupies a single fiber for execute a cbus loop
//! - lcpipe - delivery message from producer to the consumer (endpoint), delivery never blocks consumer or producer
//!
//! Now schematically:
//!
//! ```text
//! TX thread
//! ┌────────────────────────┐
//! ┌───────────────┐ msg 1 │ │
//! │ ├─────────┐ lcpipe 1│ ┌───────────┐ │
//! │ thread 1 │ msg 2 ├──────────┼─► │endpoint 1 │fiber 1 │
//! │ ├─────────│ │ └───────────┘ │
//! └───────────────┘ │ │ │
//! │ lcpipe 2│ ┌───────────┐ │
//! ┌───────────────┐ ├──────────┼─► │endpoint 2 │fiber 2 │
//! │ │ msg 3 │ │ └───────────┘ │
//! │ thread 2 ├─────────┤ │ │
//! │ ├─────────┤ │ │
//! └───────────────┘ msg 4 │ │ .... │
//! │ │ │
//! ┌───────────────┐ msg 5 │ │ ┌───────────┐ │
//! │ ├─────────┤ │ │endpoint N │fiber N │
//! │ thread 3 ├─────────┘ │ └───────────┘ │
//! │ │ msg 6 │ │
//! └───────────────┘ │ │
//! └────────────────────────┘
//!```
//!
//! ## Cbus based channels
//!
//! The main idea of cbus based channels - use `lcpipe` to send a message about the need to unlock the consumer fiber.
//! Unlock consumer always means that there is a new data for consuming, but consumer not always locking
//! on try to receiver, if data is already available - lock is redundant.
//! For implementing a consumer lock and unlock a [`crate::fiber::Cond`] is used.
pub mod oneshot;
pub mod unbounded;
use crate::ffi;
use crate::ffi::tarantool::{
cbus_endpoint_delete, cbus_endpoint_new, cbus_loop, lcpipe_delete, lcpipe_new, lcpipe_push_now,
};
use std::ffi::CString;
use std::os::raw::c_void;
use std::ptr;
#[derive(Debug, thiserror::Error)]
pub enum RecvError {
#[error("sending half of a channel is disconnected")]
Disconnected,
}
#[derive(Debug, thiserror::Error)]
pub enum CbusError {
#[error("endpoint with given name already registered")]
EndpointAlreadyExists,
}
#[repr(C)]
struct STailQEntry {
next: *const STailQEntry,
}
/// One hop in a message travel route. Next destination defined by `_pipe` field,
/// but for `lcpipe` there is only one hop supported, so `_pipe` field must always be NULL.
#[repr(C)]
pub struct MessageHop {
f: unsafe fn(*mut c_void),
_pipe: *const c_void,
}
/// A message traveling between thread and cord.
#[repr(C)]
pub struct Message<T> {
fifo: STailQEntry,
route: *mut MessageHop,
hop: *mut MessageHop,
callback: Option<T>,
}
impl<F> Message<F>
where
F: FnOnce() + 'static,
{
unsafe fn trampoline(msg: *mut c_void) {
let msg = msg.cast::<Self>();
let mut msg = Box::from_raw(msg);
if let Some(callback) = msg.callback.take() {
callback();
}
}
/// Create a new cbus message.
///
/// # Arguments
///
/// * `callback`: executes when the message reaches destination endpoint
pub fn new(callback: F) -> Self {
let hop = MessageHop {
f: Self::trampoline,
_pipe: std::ptr::null(),
};
let hop = Box::new(hop);
let hop = Box::into_raw(hop);
Self {
fifo: STailQEntry { next: ptr::null() },
route: hop,
hop,
callback: Some(callback),
}
}
}
impl<T> Drop for Message<T> {
fn drop(&mut self) {
let hop = self.hop.cast::<MessageHop>();
drop(unsafe { Box::from_raw(hop) });
}
}
/// Cbus endpoint. Endpoint is a message consumer on a cord side.
pub struct Endpoint {
endpoint: *const (),
}
impl Endpoint {
/// Create a new cbus endpoint
///
/// # Arguments
///
/// * `name`: endpoint name
pub fn new(name: &str) -> Result<Self, CbusError> {
let mut endpoint: *mut () = ptr::null_mut();
let endpoint_ptr: *mut *mut () = &mut endpoint;
let name = CString::new(name).expect("endpoint name may not contain interior null bytes");
let err = unsafe { cbus_endpoint_new(endpoint_ptr as *mut *mut c_void, name.as_ptr()) };
if err != 0 {
return Err(CbusError::EndpointAlreadyExists);
}
Ok(Endpoint { endpoint })
}
/// Run the message delivery loop until the current fiber is cancelled.
pub fn cbus_loop(&self) {
unsafe { cbus_loop(self.endpoint as *mut c_void) }
}
}
impl Drop for Endpoint {
fn drop(&mut self) {
// return value is ignored cause, currently, there is no situation when deleting may fail
unsafe { cbus_endpoint_delete(self.endpoint as *mut c_void) };
}
}
/// A uni-directional FIFO queue from any thread to cord.
pub struct LCPipe {
pipe: *mut ffi::tarantool::LCPipe,
}
unsafe impl Send for LCPipe {}
unsafe impl Sync for LCPipe {}
impl LCPipe {
/// Create and initialize a pipe and connect it to the consumer.
/// The call returns only when the consumer, identified by endpoint name, has joined the bus.
pub fn new(endpoint_name: &str) -> Self {
let endpoint =
CString::new(endpoint_name).expect("endpoint name may not contain interior null bytes");
Self {
pipe: unsafe { lcpipe_new(endpoint.as_ptr()) },
}
}
/// Push a new message into pipe. Message will be flushed to consumer queue (but not handled) immediately.
pub fn push_message<T>(&self, msg: Message<T>) {
let msg = Box::new(msg);
// leaks a message, there is no `Box::from_raw` later, because it will happen implicitly
// when [`MessageHop::f`] called
let msg = Box::leak(msg);
unsafe { lcpipe_push_now(self.pipe, msg as *mut Message<T> as *mut c_void) }
}
}
impl Drop for LCPipe {
fn drop(&mut self) {
unsafe { lcpipe_delete(self.pipe) };
}
}
#[cfg(feature = "internal_test")]
mod tests {
use crate::cbus;
use crate::cbus::Message;
use crate::fiber::{Cond, Fiber};
use std::thread;
use std::thread::ThreadId;
pub(super) fn run_cbus_endpoint(endpoint_name: &str) -> Fiber<'static, ()> {
let mut fiber = Fiber::new("cbus_fiber", &mut |_: Box<()>| {
let cbus_endpoint = cbus::Endpoint::new(endpoint_name).unwrap();
cbus_endpoint.cbus_loop();
0
});
fiber.start(());
fiber
}
#[crate::test(tarantool = "crate")]
pub fn cbus_send_message_test() {
static mut TX_THREAD_ID: Option<ThreadId> = None;
static mut SENDER_THREAD_ID: Option<ThreadId> = None;
let mut cbus_fiber = run_cbus_endpoint("cbus_send_message_test");
struct CondPtr(*const Cond);
unsafe impl Send for CondPtr {}
let cond = Cond::new();
let cond_ptr = CondPtr(&cond as *const Cond);
let thread = thread::spawn(move || {
unsafe { SENDER_THREAD_ID = Some(thread::current().id()) };
let pipe = cbus::LCPipe::new("cbus_send_message_test");
let msg = Message::new(move || {
unsafe { TX_THREAD_ID = Some(thread::current().id()) };
let cond = unsafe { cond_ptr.0.as_ref().unwrap() };
cond.broadcast();
});
pipe.push_message(msg);
});
cond.wait();
unsafe {
assert!(SENDER_THREAD_ID.is_some());
assert!(TX_THREAD_ID.is_some());
assert_ne!(SENDER_THREAD_ID, TX_THREAD_ID);
}
thread.join().unwrap();
cbus_fiber.cancel();
}
}
use super::{LCPipe, Message};
use crate::cbus::RecvError;
use crate::fiber::Cond;
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
/// A oneshot channel based on tarantool cbus. This a channel between any arbitrary thread and a cord.
/// Cord - a thread with `libev` event loop inside (typically tx thread).
struct Channel<T> {
message: UnsafeCell<Option<T>>,
/// Condition variable for synchronize consumer (cord) and producer,
/// using an [`Arc`] instead of raw pointer cause there is a situation
/// when channel dropped before cbus endpoint receive a cond
cond: Arc<Cond>,
/// Atomic flag, signaled that sender already have a data for receiver
ready: AtomicBool,
}
unsafe impl<T> Sync for Channel<T> where T: Send {}
unsafe impl<T> Send for Channel<T> where T: Send {}
impl<T> Channel<T> {
/// Create a new channel.
fn new() -> Self {
Self {
message: UnsafeCell::new(None),
ready: AtomicBool::new(false),
cond: Arc::new(Cond::new()),
}
}
}
/// A sending-half of oneshot channel. Can be used in any context (tarantool cord or arbitrary thread).
/// Messages can be sent through this channel with [`Sender::send`].
///
/// If sender dropped before [`Sender::send`] is calling then [`EndpointReceiver::receive`] will return with [`RecvError::Disconnected`].
/// It is safe to drop sender when [`EndpointReceiver::receive`] is not calling.
pub struct Sender<T> {
channel: Arc<Channel<T>>,
pipe: Arc<LCPipe>,
}
/// Receiver part of oneshot channel. Must be used in cord context.
pub struct EndpointReceiver<T> {
channel: Arc<Channel<T>>,
}
/// Creates a new oneshot channel, returning the sender/receiver halves with already created [`LCPipe`] instance.
/// This method is useful if you want to avoid any memory allocations.
/// Typically better use a [`channel`] method that create a new lcpipe instance,
/// lcpipe is pretty small structure so overhead is not big.
///
/// # Arguments
///
/// * `pipe`: lcpipe - a cbus communication channel
///
/// returns: (Sender<T>, Receiver<T>)
///
/// # Examples
///
/// ```no_run
/// #[cfg(feature = "picodata")] {
/// use std::sync::Arc;
/// use tarantool::cbus::oneshot;
/// use tarantool::cbus::LCPipe;
///
/// let pipe = LCPipe::new("some_endpoint");
/// let (sender, receiver) = oneshot::channel_on_pipe::<u8>(Arc::new(pipe));
/// }
/// ```
pub fn channel_on_pipe<T>(pipe: Arc<LCPipe>) -> (Sender<T>, EndpointReceiver<T>) {
let channel = Arc::new(Channel::new());
(
Sender {
channel: channel.clone(),
pipe,
},
EndpointReceiver { channel },
)
}
/// Creates a new oneshot channel, returning the sender/receiver halves. Please note that the receiver should only be used inside the cord.
///
/// # Arguments
///
/// * `cbus_endpoint`: cbus endpoint name. Note that the tx thread (or any other cord)
/// must have a fiber occupied by the endpoint cbus_loop.
///
/// returns: (Sender<T>, Receiver<T>)
///
/// # Examples
///
/// ```no_run
/// #[cfg(feature = "picodata")] {
/// use tarantool::cbus::oneshot;
/// let (sender, receiver) = oneshot::channel::<u8>("some_endpoint");
/// }
/// ```
pub fn channel<T>(cbus_endpoint: &str) -> (Sender<T>, EndpointReceiver<T>) {
channel_on_pipe(Arc::new(LCPipe::new(cbus_endpoint)))
}
impl<T> Sender<T> {
/// Attempts to send a value on this channel.
///
/// # Arguments
///
/// * `message`: message to send
pub fn send(self, message: T) {
unsafe { *self.channel.message.get() = Some(message) };
self.channel.ready.store(true, Ordering::Release);
// [`Sender`] dropped at this point and [`Cond::signal()`] happens on drop.
// Another words, [`Cond::signal()`] happens anyway, regardless of the existence of message in the channel.
// After that, the receiver interprets the lack of a message as a disconnect.
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let cond = Arc::clone(&self.channel.cond);
let msg = Message::new(move || {
cond.signal();
});
self.pipe.push_message(msg);
}
}
impl<T> EndpointReceiver<T> {
/// Attempts to wait for a value on this receiver, returns a [`RecvError`]
/// if the corresponding channel has hung up (sender was dropped).
pub fn receive(self) -> Result<T, RecvError> {
if !self.channel.ready.swap(false, Ordering::Acquire) {
// assume that situation when [`crate::fiber::Cond::signal()`] called before
// [`crate::fiber::Cond::wait()`] and after swap `ready` to false is never been happen,
// cause signal and wait both calling in tx thread (or any other cord) and there is now yields between it
self.channel.cond.wait();
}
unsafe {
self.channel
.message
.get()
.as_mut()
.expect("unexpected null pointer")
.take()
}
.ok_or(RecvError::Disconnected)
}
}
impl<T> Default for Channel<T> {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "internal_test")]
mod tests {
use super::super::tests::run_cbus_endpoint;
use crate::cbus;
use crate::cbus::{oneshot, RecvError};
use crate::fiber::{check_yield, YieldResult};
use std::sync::Arc;
use std::time::Duration;
use std::{mem, thread};
#[crate::test(tarantool = "crate")]
pub fn oneshot_test() {
let mut cbus_fiber = run_cbus_endpoint("oneshot_test");
let (sender, receiver) = oneshot::channel("oneshot_test");
let thread = thread::spawn(move || {
thread::sleep(Duration::from_secs(1));
sender.send(1);
});
assert_eq!(
check_yield(|| { receiver.receive().unwrap() }),
YieldResult::Yielded(1)
);
thread.join().unwrap();
let (sender, receiver) = oneshot::channel("oneshot_test");
let thread = thread::spawn(move || {
sender.send(2);
});
thread.join().unwrap();
assert_eq!(
check_yield(|| { receiver.receive().unwrap() }),
YieldResult::DidntYield(2)
);
cbus_fiber.cancel();
}
#[crate::test(tarantool = "crate")]
pub fn oneshot_multiple_channels_test() {
let mut cbus_fiber = run_cbus_endpoint("oneshot_multiple_channels_test");
let pipe = cbus::LCPipe::new("oneshot_multiple_channels_test");
let pipe = Arc::new(pipe);
let (sender1, receiver1) = oneshot::channel_on_pipe(Arc::clone(&pipe));
let (sender2, receiver2) = oneshot::channel_on_pipe(Arc::clone(&pipe));
let thread1 = thread::spawn(move || {
thread::sleep(Duration::from_secs(1));
sender1.send("1");
});
let thread2 = thread::spawn(move || {
thread::sleep(Duration::from_secs(2));
sender2.send("2");
});
let result2 = receiver2.receive();
let result1 = receiver1.receive();
assert!(matches!(result1, Ok("1")));
assert!(matches!(result2, Ok("2")));
thread1.join().unwrap();
thread2.join().unwrap();
cbus_fiber.cancel();
}
#[crate::test(tarantool = "crate")]
pub fn oneshot_sender_drop_test() {
let mut cbus_fiber = run_cbus_endpoint("oneshot_sender_drop_test");
let (sender, receiver) = oneshot::channel::<()>("oneshot_sender_drop_test");
let thread = thread::spawn(move || {
thread::sleep(Duration::from_secs(1));
mem::drop(sender)
});
let result = receiver.receive();
assert!(matches!(result, Err(RecvError::Disconnected)));
thread.join().unwrap();
cbus_fiber.cancel();
}
}
use super::{LCPipe, Message};
use crate::cbus::RecvError;
use crate::fiber::Cond;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
/// A synchronization component between producers and a consumer.
struct Waker {
/// synchronize a waker, signal when waker is up to date
condition: Option<Arc<Cond>>,
/// indicate that waker already up to date
woken: AtomicBool,
/// pipe for sending syncronization signals
pipe: LCPipe,
}
impl Waker {
fn new(cond: Cond, pipe: LCPipe) -> Self {
Self {
condition: Some(Arc::new(cond)),
woken: AtomicBool::new(false),
pipe,
}
}
/// Send wakeup signal to a [`Waker::wait`] caller.
fn force_wakeup(&self, cond: Arc<Cond>) {
let msg = Message::new(move || {
cond.signal();
});
self.pipe.push_message(msg);
}
/// Release waker if it lock in [`Waker::wait`].
fn wakeup(&self) {
let do_wake = self
.woken
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok();
if do_wake {
let cond = Arc::clone(
self.condition
.as_ref()
.expect("unreachable: condition never empty"),
);
self.force_wakeup(cond);
}
}
/// Lock until waker is woken up, or return instantly if waker already woken.
fn wait(&self) {
while self
.woken
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
self.condition
.as_ref()
.expect("unreachable: condition never empty")
.wait();
}
}
}
impl Drop for Waker {
fn drop(&mut self) {
if let Some(cond) = self.condition.take() {
self.force_wakeup(cond);
}
}
}
/// A unbounded mpsc channel based on tarantool cbus.
/// This a channel between any arbitrary threads (producers) and a cord (consumer).
/// Cord - a thread with `libev` event loop inside (typically tx thread).
struct Channel<T> {
/// [`crossbeam_queue::SegQueue`] is used as lock free buffer, internally this is a linked list with buckets
list: crossbeam_queue::SegQueue<T>,
/// synchronize receiver and producers
waker: Waker,
/// indicate that all producers are disconnected from channel
disconnected: AtomicBool,
}
impl<T> Channel<T> {
/// Create a new channel.
///
/// # Arguments
///
/// * `pipe`: cbus lcpipe instance.
fn new(pipe: LCPipe) -> Self {
let cond = Cond::new();
Self {
list: crossbeam_queue::SegQueue::new(),
waker: Waker::new(cond, pipe),
disconnected: AtomicBool::new(false),
}
}
}
/// Creates a new unbounded channel, returning the sender/receiver halves. Please note that the receiver should only be used inside the cord.
///
/// # Arguments
///
/// * `cbus_endpoint`: cbus endpoint name. Note that the tx thread (or any other cord)
/// must have a fiber occupied by the endpoint cbus_loop.
///
/// returns: (Sender<T>, Receiver<T>)
///
/// # Examples
///
/// ```no_run
/// #[cfg(feature = "picodata")] {
/// use tarantool::cbus::unbounded;
/// let (sender, receiver) = unbounded::channel::<u8>("some_endpoint");
/// }
/// ```
pub fn channel<T>(cbus_endpoint: &str) -> (Sender<T>, EndpointReceiver<T>) {
let pipe = LCPipe::new(cbus_endpoint);
let chan = Arc::new(Channel::new(pipe));
let s = SenderInner {
chan: Arc::clone(&chan),
};
let r = EndpointReceiver {
chan: Arc::clone(&chan),
};
(Sender { inner: Arc::new(s) }, r)
}
struct SenderInner<T> {
chan: Arc<Channel<T>>,
}
unsafe impl<T> Send for SenderInner<T> {}
impl<T> Drop for SenderInner<T> {
fn drop(&mut self) {
self.chan.disconnected.store(true, Ordering::Release);
self.chan.waker.wakeup();
}
}
/// A sending-half of unbounded channel. Can be used in any context (tarantool cord or arbitrary thread).
/// Messages can be sent through this channel with [`Sender::send`].
/// Clone the sender if you need one more producer.
pub struct Sender<T> {
inner: Arc<SenderInner<T>>,
}
unsafe impl<T> Send for Sender<T> {}
unsafe impl<T> Sync for Sender<T> {}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T> Sender<T> {
/// Attempts to send a value on this channel.
///
/// # Arguments
///
/// * `message`: message to send
pub fn send(&self, msg: T) {
self.inner.chan.list.push(msg);
// wake up a sleeping receiver
self.inner.chan.waker.wakeup();
}
}
/// Receiver part of unbounded channel. Must be used in cord context.
pub struct EndpointReceiver<T> {
chan: Arc<Channel<T>>,
}
unsafe impl<T> Send for EndpointReceiver<T> {}
impl<T> EndpointReceiver<T> {
/// Attempts to wait for a value on this receiver, returns a [`RecvError::Disconnected`]
/// when all of producers are dropped.
pub fn receive(&self) -> Result<T, RecvError> {
loop {
if let Some(msg) = self.chan.list.pop() {
return Ok(msg);
}
if self.chan.disconnected.load(Ordering::Acquire) {
return Err(RecvError::Disconnected);
}
self.chan.waker.wait();
}
}
/// Return message count in receiver buffer.
pub fn len(&self) -> usize {
self.chan.list.len()
}
/// Return true if receiver message buffer is empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[cfg(feature = "internal_test")]
mod tests {
use super::super::tests::run_cbus_endpoint;
use crate::cbus::{unbounded, RecvError};
use crate::fiber;
use crate::fiber::{check_yield, YieldResult};
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
#[crate::test(tarantool = "crate")]
pub fn unbounded_test() {
let mut cbus_fiber = run_cbus_endpoint("unbounded_test");
let (tx, rx) = unbounded::channel("unbounded_test");
let thread = thread::spawn(move || {
for i in 0..1000 {
tx.send(i);
if i % 100 == 0 {
thread::sleep(Duration::from_millis(1000));
}
}
});
assert_eq!(
check_yield(|| {
let mut recv_results = vec![];
for _ in 0..1000 {
recv_results.push(rx.receive().unwrap());
}
recv_results
}),
YieldResult::Yielded((0..1000).collect::<Vec<_>>())
);
thread.join().unwrap();
cbus_fiber.cancel();
}
#[crate::test(tarantool = "crate")]
pub fn unbounded_test_drop_rx_before_tx() {
// This test check that there is no memory corruption if sender part of channel drops after
// receiver part. Previously, when the receiver was drop after sender, [`Fiber::Cond`] release outside the tx thread
// and segfault is occurred.
let mut cbus_fiber = run_cbus_endpoint("unbounded_test_drop_rx_before_tx");
let (tx, rx) = unbounded::channel("unbounded_test_drop_rx_before_tx");
let thread = thread::spawn(move || {
for i in 1..300 {
tx.send(i);
if i % 100 == 0 {
thread::sleep(Duration::from_secs(1));
}
}
});
fiber::sleep(Duration::from_secs(1));
drop(rx);
thread.join().unwrap();
cbus_fiber.cancel();
}
#[crate::test(tarantool = "crate")]
pub fn unbounded_disconnect_test() {
let mut cbus_fiber = run_cbus_endpoint("unbounded_disconnect_test");
let (tx, rx) = unbounded::channel("unbounded_disconnect_test");
let thread = thread::spawn(move || {
tx.send(1);
tx.send(2);
});
assert!(matches!(rx.receive(), Ok(1)));
assert!(matches!(rx.receive(), Ok(2)));
assert!(matches!(rx.receive(), Err(RecvError::Disconnected)));
thread.join().unwrap();
cbus_fiber.cancel();
}
#[crate::test(tarantool = "crate")]
pub fn unbounded_mpsc_test() {
const MESSAGES_PER_PRODUCER: i32 = 10_000;
let mut cbus_fiber = run_cbus_endpoint("unbounded_mpsc_test");
let (tx, rx) = unbounded::channel("unbounded_mpsc_test");
fn create_producer(sender: unbounded::Sender<i32>) -> JoinHandle<()> {
thread::spawn(move || {
for i in 0..MESSAGES_PER_PRODUCER {
sender.send(i);
}
})
}
let jh1 = create_producer(tx.clone());
let jh2 = create_producer(tx.clone());
let jh3 = create_producer(tx);
for _ in 0..MESSAGES_PER_PRODUCER * 3 {
assert!(matches!(rx.receive(), Ok(_)));
}
assert!(matches!(rx.receive(), Err(RecvError::Disconnected)));
jh1.join().unwrap();
jh2.join().unwrap();
jh3.join().unwrap();
cbus_fiber.cancel();
}
}
//! A [`Decimal`] number implemented using the builtin tarantool api.
use crate::ffi::decimal as ffi;
use std::convert::{TryFrom, TryInto};
......@@ -6,13 +8,17 @@ use std::mem::size_of;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
/// A Decimal number implemented using the builtin tarantool api. **Note** that
/// this api is not available in all versions of tarantool.
/// A Decimal number implemented using the builtin tarantool api.
///
/// ## Availability
/// This api is not available in all versions of tarantool.
/// Use [`tarantool::ffi::has_decimal`] to check if it is supported in your
/// case.
/// If `has_decimal` return `false`, using any function from this module
/// If `has_decimal` returns `false`, using any function from this module
/// will result in a **panic**.
///
/// This API is thread-safe unlike the original tarantool decimal API.
///
/// [`tarantool::ffi::has_decimal`]: crate::ffi::has_decimal
#[derive(Debug, Copy, Clone)]
pub struct Decimal {
......@@ -118,15 +124,17 @@ impl Decimal {
}
let ndig = (self.precision() - self.scale() + scale as i32).max(1);
let mut ctx: Context = unsafe { &*CONTEXT }.clone();
ctx.set_precision(ndig as _).unwrap();
ctx.set_max_exponent(ndig as _).unwrap();
ctx.set_min_exponent(if scale != 0 { -1 } else { 0 })
.unwrap();
ctx.set_rounding(mode);
ctx.plus(&mut self.inner);
check_status(ctx.status()).ok()?;
CONTEXT.with(|ctx| {
let Context(mut ctx) = ctx.borrow().clone();
ctx.set_precision(ndig as _).unwrap();
ctx.set_max_exponent(ndig as _).unwrap();
ctx.set_min_exponent(if scale != 0 { -1 } else { 0 })
.unwrap();
ctx.set_rounding(mode);
ctx.plus(&mut self.inner);
check_status(ctx.status()).ok()
})?;
Self::try_from(self.inner).ok()
}
......@@ -272,49 +280,39 @@ impl TryFrom<DecimalImpl> for Decimal {
/// Context
////////////////////////////////////////////////////////////////////////////////
type Context = dec::Context<DecimalImpl>;
static mut CONTEXT: Lazy<Context> = Lazy::new(|| {
let mut ctx = Context::default();
ctx.set_rounding(dec::Rounding::HalfUp);
ctx.set_precision(ffi::DECIMAL_MAX_DIGITS as _).unwrap();
ctx.set_clamp(false);
ctx.set_max_exponent((ffi::DECIMAL_MAX_DIGITS - 1) as _)
.unwrap();
ctx.set_min_exponent(-1).unwrap();
ctx
});
#[derive(Clone)]
struct Context(dec::Context<DecimalImpl>);
// This will make Decimals thread safe in exchange for some performance penalty.
// Seeing as how tarantool's decimals aren't thread safe, for now we don't care
// thread_local! {
// static CONTEXT: Lazy<std::cell::RefCell<Context>> = Lazy::new(|| {
// let mut ctx = Context::default();
// ctx.set_rounding(dec::Rounding::HalfUp);
// ctx.set_precision(ffi::DECIMAL_MAX_DIGITS as _).unwrap();
// ctx.set_clamp(false);
// ctx.set_max_exponent((ffi::DECIMAL_MAX_DIGITS - 1) as _).unwrap();
// ctx.set_min_exponent(-1).unwrap();
// std::cell::RefCell::new(ctx)
// });
// }
impl Default for Context {
fn default() -> Self {
let mut ctx = dec::Context::default();
ctx.set_rounding(dec::Rounding::HalfUp);
ctx.set_precision(ffi::DECIMAL_MAX_DIGITS as _).unwrap();
ctx.set_clamp(false);
ctx.set_max_exponent((ffi::DECIMAL_MAX_DIGITS - 1) as _)
.unwrap();
ctx.set_min_exponent(-1).unwrap();
Self(ctx)
}
}
// This makes Decimals thread safe in exchange for some performance penalty.
thread_local! {
static CONTEXT: Lazy<std::cell::RefCell<Context>> = Lazy::new(std::cell::RefCell::default);
}
#[inline(always)]
fn with_context<F, T>(f: F) -> Option<T>
where
F: FnOnce(&mut Context) -> T,
F: FnOnce(&mut dec::Context<DecimalImpl>) -> T,
{
let ctx = unsafe { &mut CONTEXT };
let res = f(ctx);
let status = ctx.status();
ctx.set_status(Default::default());
check_status(status).map(|()| res).ok()
// CONTEXT.with(|ctx| {
// let ctx = &mut *ctx.borrow_mut();
// let res = f(ctx);
// let status = ctx.status();
// ctx.set_status(Default::default());
// check_status(status).map(|()| res).ok()
// })
CONTEXT.with(|ctx| {
let Context(ctx) = &mut *ctx.borrow_mut();
let res = f(ctx);
let status = ctx.status();
ctx.set_status(Default::default());
check_status(status).map(|()| res).ok()
})
}
////////////////////////////////////////////////////////////////////////////////
......@@ -500,7 +498,7 @@ macro_rules! impl_bin_op {
#[inline(always)]
#[track_caller]
fn $ass_op(&mut self, rhs: T) {
*self = self.$m(rhs).expect("overlow")
*self = self.$m(rhs).expect("overflow")
}
}
};
......@@ -649,8 +647,16 @@ macro_rules! impl_from_int {
}
impl_from_int! {i8 i16 i32 u8 u16 u32 => DecimalImpl::from}
impl_from_int! {i64 isize => |num| CONTEXT.from_i64(num as _)}
impl_from_int! {u64 usize => |num| CONTEXT.from_u64(num as _)}
impl_from_int! {
i64 isize => |num| {
CONTEXT.with(|ctx| ctx.borrow_mut().0.from_i64(num as _))
}
}
impl_from_int! {
u64 usize => |num| {
CONTEXT.with(|ctx| ctx.borrow_mut().0.from_u64(num as _))
}
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum DecimalFromfloatError<T> {
......@@ -858,14 +864,43 @@ macro_rules! decimal {
#[cfg(test)]
mod test {
use super::Decimal;
use once_cell::sync::Lazy;
use std::convert::TryFrom;
use std::sync::Mutex;
static DECIMALS_ARENT_THREAD_SAFE: Lazy<Mutex<()>> = Lazy::new(Default::default);
#[test]
fn thread_safe_decimal() {
let mut handles = Vec::new();
for _ in 0..100 {
handles.push(std::thread::spawn(move || {
for _ in 0..10 {
// Somehow this combination of successful and erroneous parse
// would consistently cause errors in the not thread-safe implementation.
let _: Decimal = "-81.1e-1".parse().unwrap();
let _ = "foobar".parse::<Decimal>().unwrap_err();
}
}));
}
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn from_string() {
let d: Decimal = "-81.1e-1".parse().unwrap();
assert_eq!(d.to_string(), "-8.11");
assert_eq!(decimal!(-81.1e-1).to_string(), "-8.11");
assert_eq!("foobar".parse::<Decimal>().ok(), None::<Decimal>);
assert_eq!("".parse::<Decimal>().ok(), None::<Decimal>);
// tarantool decimals don't support infinity or NaN
assert_eq!("inf".parse::<Decimal>().ok(), None::<Decimal>);
assert_eq!("infinity".parse::<Decimal>().ok(), None::<Decimal>);
assert_eq!("NaN".parse::<Decimal>().ok(), None::<Decimal>);
}
#[test]
fn from_num() {
let _lock = DECIMALS_ARENT_THREAD_SAFE.lock().unwrap();
assert_eq!(Decimal::from(0i8), Decimal::zero());
assert_eq!(Decimal::from(42i8).to_string(), "42");
assert_eq!(Decimal::from(i8::MAX).to_string(), "127");
......@@ -981,7 +1016,6 @@ mod test {
#[test]
pub fn to_num() {
let _lock = DECIMALS_ARENT_THREAD_SAFE.lock().unwrap();
assert_eq!(i64::try_from(decimal!(420)).unwrap(), 420);
assert_eq!(
i64::try_from(decimal!(9223372036854775807)).unwrap(),
......@@ -1077,7 +1111,6 @@ mod test {
#[test]
pub fn cmp() {
let _lock = DECIMALS_ARENT_THREAD_SAFE.lock().unwrap();
assert!(decimal!(.1) < decimal!(.2));
assert!(decimal!(.1) <= decimal!(.2));
assert!(decimal!(.2) > decimal!(.1));
......@@ -1091,7 +1124,6 @@ mod test {
#[test]
pub fn hash() {
let _lock = DECIMALS_ARENT_THREAD_SAFE.lock().unwrap();
fn to_hash<T: std::hash::Hash>(t: &T) -> u64 {
let mut s = std::collections::hash_map::DefaultHasher::new();
t.hash(&mut s);
......@@ -1134,7 +1166,6 @@ mod test {
#[test]
#[allow(clippy::bool_assert_comparison)]
pub fn ops() {
let _lock = DECIMALS_ARENT_THREAD_SAFE.lock().unwrap();
let a = decimal!(.1);
let b = decimal!(.2);
let c = decimal!(.3);
......
......@@ -1087,3 +1087,21 @@ extern "C" {
/// Tarantool stored procedure signature.
pub type Proc =
unsafe extern "C" fn(crate::tuple::FunctionCtx, crate::tuple::FunctionArgs) -> c_int;
// Cbus lcpipe.
#[cfg(feature = "picodata")]
#[repr(C)]
pub struct LCPipe {
_unused: [u8; 0],
}
#[cfg(feature = "picodata")]
extern "C" {
pub fn lcpipe_new(name: *const c_char) -> *mut LCPipe;
pub fn lcpipe_push_now(lcpipe: *mut LCPipe, cmsg: *mut c_void);
pub fn lcpipe_delete(lcpipe: *mut LCPipe);
pub fn cbus_endpoint_new(endpoint: *mut *mut c_void, name: *const c_char) -> c_int;
pub fn cbus_endpoint_delete(endpoint: *mut c_void) -> c_int;
pub fn cbus_loop(endpoint: *mut c_void);
pub fn cbus_process(endpoint: *mut c_void);
}
......@@ -18,6 +18,7 @@ use std::os::raw::c_void;
use std::ptr::NonNull;
use std::time::Duration;
use crate::time::Instant;
use crate::tlua::{self as tlua, AsLua};
#[cfg(not(all(target_arch = "aarch64", target_os = "macos")))]
......@@ -149,8 +150,10 @@ impl<'a, T> Fiber<'a, T> {
F: FnMut(Box<T>) -> i32,
{
let (callback_ptr, trampoline) = unsafe { unpack_callback(callback) };
// The pointer into this variable must be valid until `fiber_new` returns.
let name_cstr = CString::new(name).expect("fiber name should not contain nul bytes");
Self {
inner: unsafe { ffi::fiber_new(CString::new(name).unwrap().into_raw(), trampoline) },
inner: unsafe { ffi::fiber_new(name_cstr.as_ptr(), trampoline) },
callback: callback_ptr,
phantom: PhantomData,
}
......@@ -173,14 +176,10 @@ impl<'a, T> Fiber<'a, T> {
F: FnMut(Box<T>) -> i32,
{
let (callback_ptr, trampoline) = unsafe { unpack_callback(callback) };
// The pointer into this variable must be valid until `fiber_new_ex` returns.
let name_cstr = CString::new(name).expect("fiber name should not contain nul bytes");
Self {
inner: unsafe {
ffi::fiber_new_ex(
CString::new(name).unwrap().into_raw(),
attr.inner,
trampoline,
)
},
inner: unsafe { ffi::fiber_new_ex(name_cstr.as_ptr(), attr.inner, trampoline) },
callback: callback_ptr,
phantom: PhantomData,
}
......@@ -1317,24 +1316,12 @@ pub fn sleep(time: Duration) {
unsafe { ffi::fiber_sleep(time.as_secs_f64()) }
}
/// Report loop begin time as double (cheap).
pub fn time() -> f64 {
unsafe { ffi::fiber_time() }
}
/// Report loop begin time as 64-bit int.
pub fn time64() -> u64 {
unsafe { ffi::fiber_time64() }
}
/// Report loop begin time as double (cheap). Uses monotonic clock.
pub fn clock() -> f64 {
unsafe { ffi::fiber_clock() }
}
/// Report loop begin time as 64-bit int. Uses monotonic clock.
pub fn clock64() -> u64 {
unsafe { ffi::fiber_clock64() }
/// Get [`Instant`] corresponding to event loop iteration begin time.
/// Uses monotonic clock.
#[inline]
pub fn clock() -> Instant {
let secs = unsafe { ffi::fiber_clock() };
Instant(Duration::from_secs_f64(secs))
}
/// Yield control to the scheduler.
......@@ -1620,4 +1607,15 @@ mod tests {
jh.join();
assert_eq!(*res.borrow(), 1);
}
#[crate::test(tarantool = "crate")]
fn fiber_sleep_and_clock() {
let before_sleep = clock();
let sleep_for = Duration::from_millis(100);
sleep(sleep_for);
assert!(before_sleep.elapsed() >= sleep_for);
assert!(clock() >= before_sleep);
assert!(clock() - before_sleep >= sleep_for);
}
}
......@@ -25,13 +25,7 @@
//! - [`timeout::IntoTimeout`]
//! - [`IntoOnDrop`]
use std::{
future::Future,
pin::Pin,
rc::Rc,
task::Poll,
time::{Duration, Instant},
};
use std::{future::Future, pin::Pin, rc::Rc, task::Poll, time::Duration};
use futures::pin_mut;
......@@ -138,9 +132,9 @@ pub(crate) mod context {
use std::os::unix::io::RawFd;
use std::task::Context;
use std::task::Waker;
use std::time::Instant;
use crate::ffi::tarantool as ffi;
use crate::time::Instant;
/// The context is primarily used to pass wakup conditions from a
/// pending future to the async executor (i.e `block_on`). There's
......@@ -285,7 +279,7 @@ pub fn block_on<F: Future>(f: F) -> F::Output {
}
let timeout = match cx.deadline {
Some(deadline) => deadline.saturating_duration_since(Instant::now()),
Some(deadline) => deadline.duration_since(super::clock()),
None => Duration::MAX,
};
......@@ -312,14 +306,37 @@ pub fn block_on<F: Future>(f: F) -> F::Output {
}
}
/// An async friendly version of [fiber::sleep](crate::fiber::sleep). Prefer this version when working in async
/// contexts.
pub async fn sleep(time: Duration) {
use timeout::IntoTimeout as _;
// We can't just do a `fiber::sleep` as we need this to work well with other futures
let (tx, rx) = oneshot::channel::<()>();
rx.timeout(time).await.unwrap_err();
drop(tx);
}
#[cfg(feature = "internal_test")]
mod tests {
use std::cell::Cell;
use super::timeout::IntoTimeout as _;
use super::*;
use crate::fiber;
use crate::test::util::{always_pending, ok};
#[crate::test(tarantool = "crate")]
fn sleep_wakes_up() {
let before_sleep = fiber::clock();
let sleep_for = Duration::from_millis(100);
let should_yield = fiber::check_yield(|| fiber::block_on(sleep(sleep_for)));
assert_eq!(should_yield, fiber::YieldResult::Yielded(()));
assert!(before_sleep.elapsed() >= sleep_for);
}
#[crate::test(tarantool = "crate")]
fn on_drop_is_executed() {
block_on(async {
......
......@@ -8,9 +8,10 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use super::context::ContextExt;
use crate::fiber;
use crate::time::Instant;
/// Error returned by [`Timeout`]
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
......@@ -56,7 +57,7 @@ pub struct Timeout<F> {
pub fn timeout<F: Future>(timeout: Duration, f: F) -> Timeout<F> {
Timeout {
future: f,
deadline: Instant::now().checked_add(timeout),
deadline: fiber::clock().checked_add(timeout),
}
}
......@@ -84,7 +85,7 @@ where
// Then check deadline and, if necessary, update wakup condition
// in the context.
match deadline {
Some(deadline) if Instant::now() >= deadline => {
Some(deadline) if fiber::clock() >= deadline => {
Poll::Ready(Err(Error::Expired)) // expired
}
Some(deadline) => {
......
......@@ -91,7 +91,7 @@ mod tests {
#[crate::test(tarantool = "crate")]
fn performance() {
let now = std::time::Instant::now();
let now = crate::time::Instant::now();
let _ = super::csw();
let elapsed = now.elapsed();
print!("{elapsed:?} ");
......
......@@ -44,6 +44,8 @@
//! Our examples are a good starting point for users who want to confidently start writing their own stored procedures.
//!
//! [stored procedure]: macro@crate::proc
#[cfg(feature = "picodata")]
pub mod cbus;
pub mod clock;
pub mod coio;
pub mod datetime;
......@@ -67,6 +69,7 @@ pub mod space;
pub mod sql;
#[cfg(feature = "test")]
pub mod test;
pub mod time;
pub mod transaction;
pub mod trigger;
pub mod tuple;
......
......@@ -7,7 +7,7 @@ use std::time::Duration;
use crate::coio::CoIOStream;
use crate::error::Error;
use crate::fiber::{is_cancelled, set_cancellable, sleep, time, Cond, Fiber};
use crate::fiber::{clock, is_cancelled, set_cancellable, sleep, Cond, Fiber};
use crate::net_box::stream::ConnStream;
use crate::tuple::Decode;
use crate::unwrap_or;
......@@ -100,7 +100,7 @@ impl ConnInner {
}
pub fn wait_connected(self: &Rc<Self>, timeout: Option<Duration>) -> Result<bool, Error> {
let begin_ts = time();
let begin_ts = clock();
loop {
let state = self.state.get();
match state {
......@@ -112,9 +112,7 @@ impl ConnInner {
_ => {
let timeout = match timeout {
None => None,
Some(timeout) => {
timeout.checked_sub(Duration::from_secs_f64(time() - begin_ts))
}
Some(timeout) => timeout.checked_sub(clock().duration_since(begin_ts)),
};
if !self.wait_state_changed(timeout) {
......
......@@ -2,10 +2,10 @@ use std::{
cell::{Cell, UnsafeCell},
io,
rc::{Rc, Weak},
time::{Duration, Instant},
time::Duration,
};
use crate::{clock::INFINITY, error::Error, fiber::Cond, tuple::Decode, Result};
use crate::{clock::INFINITY, error::Error, fiber::Cond, time::Instant, tuple::Decode, Result};
use super::{inner::ConnInner, protocol::Consumer};
......
......@@ -27,14 +27,13 @@ use std::os::unix::prelude::IntoRawFd;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::time::Instant;
use std::{io, ptr};
use futures::{AsyncRead, AsyncWrite};
use crate::ffi::tarantool as ffi;
use crate::fiber::r#async;
use crate::fiber::r#async::context::ContextExt;
use crate::fiber::{self, r#async};
#[derive(thiserror::Error, Debug)]
pub enum Error {
......@@ -213,7 +212,7 @@ impl AsyncWrite for TcpStream {
//
// SAFETY: Safe as long as this future is executed by
// `fiber::block_on` async executor.
unsafe { ContextExt::set_deadline(cx, Instant::now()) }
unsafe { ContextExt::set_deadline(cx, fiber::clock()) }
Poll::Pending
}
_ => Poll::Ready(Err(err)),
......@@ -261,7 +260,7 @@ impl AsyncRead for TcpStream {
//
// SAFETY: Safe as long as this future is executed by
// `fiber::block_on` async executor.
unsafe { ContextExt::set_deadline(cx, Instant::now()) }
unsafe { ContextExt::set_deadline(cx, fiber::clock()) }
Poll::Pending
}
_ => Poll::Ready(Err(err)),
......
//! Provides a custom [`Instant`] implementation, based on tarantool fiber API.
use std::mem::MaybeUninit;
use std::ops::{Add, AddAssign, Sub, SubAssign};
use std::time::Duration;
/// A measurement of a monotonically nondecreasing clock.
/// Opaque and useful only with [`Duration`].
///
/// Instants are guaranteed to be no less than any previously
/// measured instant when created, and are often useful for tasks such as measuring
/// benchmarks or timing how long an operation takes.
///
/// Note, however, that instants are **not** guaranteed to be **steady**. In other
/// words, each tick of the underlying clock might not be the same length (e.g.
/// some seconds may be longer than others). An instant may jump forwards or
/// experience time dilation (slow down or speed up), but it will never go
/// backwards.
///
/// Instants are opaque types that can only be compared to one another. There is
/// no method to get "the number of seconds" from an instant. Instead, it only
/// allows measuring the duration between two instants (or comparing two
/// instants).
///
/// This struct is almost identical to [`std::time::Instant`] but provides
/// some additional saturating methods. And it can also be constructed with
/// [`fiber::clock`](crate::fiber::clock), in which case it behaves in a tarantool specific way.
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub struct Instant(pub(crate) Duration);
impl Instant {
/// Returns an instant corresponding to "now". Uses monotonic clock.
///
/// # Examples
///
/// ```no_run
/// use tarantool::time::Instant;
///
/// let now = Instant::now();
/// ```
#[must_use]
#[inline]
pub fn now() -> Self {
unsafe {
let mut timespec = MaybeUninit::<libc::timespec>::zeroed().assume_init();
if libc::clock_gettime(libc::CLOCK_MONOTONIC, (&mut timespec) as *mut _) != 0 {
let err = std::io::Error::last_os_error();
panic!("failed to get time: {}", err)
}
Self(Duration::new(
timespec.tv_sec as u64,
timespec.tv_nsec as u32,
))
}
}
/// Returns the amount of time elapsed since this instant was created.
///
/// # Examples
///
/// ```no_run
/// use std::time::Duration;
/// use tarantool::time::Instant;
/// use tarantool::fiber;
///
/// let instant = Instant::now();
/// let three_secs = Duration::from_secs(3);
/// fiber::sleep(three_secs);
/// assert!(instant.elapsed() >= three_secs);
/// ```
#[must_use]
#[inline]
pub fn elapsed(&self) -> Duration {
Self::now().duration_since(*self)
}
/// Returns `Some(t)` where `t` is the time `self + duration` if `t` can be represented as
/// `Instant` (which means it's inside the bounds of the underlying representation), `None`
/// otherwise.
#[must_use]
#[inline]
pub fn checked_add(&self, duration: Duration) -> Option<Instant> {
self.0.checked_add(duration).map(Instant)
}
/// Returns `Some(t)` where `t` is the time `self - duration` if `t` can be represented as
/// `Instant` (which means it's inside the bounds of the underlying representation), `None`
/// otherwise.
#[must_use]
#[inline]
pub fn checked_sub(&self, duration: Duration) -> Option<Instant> {
self.0.checked_sub(duration).map(Instant)
}
/// Saturating addition. Computes `self + duration`, returning maximal possible
/// instant (allowed by the underlying representaion) if overflow occurred.
#[must_use]
#[inline]
pub fn saturating_add(&self, duration: Duration) -> Instant {
Self(self.0.saturating_add(duration))
}
/// Saturating subtraction. Computes `self - duration`, returning minimal possible
/// instant (allowed by the underlying representaion) if overflow occurred.
#[must_use]
#[inline]
pub fn saturating_sub(&self, duration: Duration) -> Instant {
Self(self.0.saturating_sub(duration))
}
/// Returns the amount of time elapsed from another instant to this one,
/// or None if that instant is later than this one.
///
/// # Examples
///
/// ```no_run
/// use std::time::Duration;
/// use std::thread::sleep;
/// use tarantool::time::Instant;
///
/// let now = Instant::now();
/// sleep(Duration::new(1, 0));
/// let new_now = Instant::now();
/// println!("{:?}", new_now.checked_duration_since(now));
/// println!("{:?}", now.checked_duration_since(new_now)); // None
/// ```
#[must_use]
#[inline]
pub fn checked_duration_since(&self, earlier: Instant) -> Option<Duration> {
self.0.checked_sub(earlier.0)
}
/// Returns the amount of time elapsed from another instant to this one,
/// or zero duration if that instant is later than this one.
///
/// # Examples
///
/// ```no_run
/// use std::time::Duration;
/// use std::thread::sleep;
/// use tarantool::time::Instant;
///
/// let now = Instant::now();
/// sleep(Duration::new(1, 0));
/// let new_now = Instant::now();
/// println!("{:?}", new_now.duration_since(now));
/// println!("{:?}", now.duration_since(new_now)); // 0ns
/// ```
#[must_use]
#[inline]
pub fn duration_since(&self, earlier: Instant) -> Duration {
self.0.saturating_sub(earlier.0)
}
}
impl Add<Duration> for Instant {
type Output = Instant;
/// # Panics
///
/// This function may panic if the resulting point in time cannot be represented by the
/// underlying data structure. See [`Instant::checked_add`] for a version without panic.
fn add(self, other: Duration) -> Instant {
self.checked_add(other)
.expect("overflow when adding duration to instant")
}
}
impl AddAssign<Duration> for Instant {
fn add_assign(&mut self, other: Duration) {
*self = *self + other;
}
}
impl Sub<Duration> for Instant {
type Output = Instant;
fn sub(self, other: Duration) -> Instant {
self.checked_sub(other)
.expect("overflow when subtracting duration from instant")
}
}
impl SubAssign<Duration> for Instant {
fn sub_assign(&mut self, other: Duration) {
*self = *self - other;
}
}
impl Sub<Instant> for Instant {
type Output = Duration;
/// Returns the amount of time elapsed from another instant to this one,
/// or zero duration if that instant is later than this one.
fn sub(self, other: Instant) -> Duration {
self.duration_since(other)
}
}
#[cfg(test)]
mod tests {
use super::Instant;
use std::time::Duration;
#[test]
fn fiber_sleep() {
let before_sleep = Instant::now();
let sleep_for = Duration::from_millis(100);
std::thread::sleep(sleep_for);
assert!(Instant::now() >= before_sleep);
assert!(before_sleep.elapsed() >= Duration::ZERO);
}
#[test]
fn addition() {
let now = Instant::now();
assert_eq!(now.checked_add(Duration::MAX), None);
assert_eq!(now.saturating_add(Duration::MAX), Instant(Duration::MAX));
let plus_second = now.checked_add(Duration::from_secs(1)).unwrap();
assert_eq!(plus_second, now.saturating_add(Duration::from_secs(1)));
assert_eq!(plus_second, now + Duration::from_secs(1));
assert!(plus_second > now);
}
#[test]
fn subtraction() {
let now = Instant::now();
assert_eq!(now.checked_sub(Duration::MAX), None);
assert_eq!(now.saturating_sub(Duration::MAX), Instant(Duration::ZERO));
let minus_second = now.checked_sub(Duration::from_secs(1)).unwrap();
assert_eq!(minus_second, now.saturating_sub(Duration::from_secs(1)));
assert_eq!(minus_second, now - Duration::from_secs(1));
assert!(minus_second < now);
}
#[test]
fn duration_since() {
let now = Instant::now();
let plus_second = now + Duration::from_secs(1);
let minus_second = now - Duration::from_secs(1);
assert_eq!(
plus_second.duration_since(minus_second),
Duration::from_secs(2)
);
assert_eq!(
plus_second.checked_duration_since(minus_second),
Some(Duration::from_secs(2))
);
assert_eq!(minus_second.duration_since(plus_second), Duration::ZERO);
assert_eq!(minus_second.checked_duration_since(plus_second), None);
}
}
......@@ -15,20 +15,6 @@ pub fn to_lua() {
assert_eq!(s, "-8.11");
}
pub fn from_string() {
let d: Decimal = "-81.1e-1".parse().unwrap();
assert_eq!(d.to_string(), "-8.11");
assert_eq!(decimal!(-81.1e-1).to_string(), "-8.11");
assert_eq!("foobar".parse::<Decimal>().ok(), None::<Decimal>);
assert_eq!("".parse::<Decimal>().ok(), None::<Decimal>);
// tarantool decimals don't support infinity or NaN
assert_eq!("inf".parse::<Decimal>().ok(), None::<Decimal>);
assert_eq!("infinity".parse::<Decimal>().ok(), None::<Decimal>);
assert_eq!("NaN".parse::<Decimal>().ok(), None::<Decimal>);
}
pub fn from_tuple() {
let t: Tuple = tarantool::lua_state()
.eval("return box.tuple.new(require('decimal').new('-8.11'))")
......