Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • core/tarantool-module
1 result
Show changes
Commits on Source (16)
Showing
with 2575 additions and 149 deletions
......@@ -135,7 +135,6 @@ build-base-image:
- cargo clippy --features "${CARGO_FEATURES}" --workspace -- --deny warnings
- cargo build --features "${CARGO_FEATURES}" --all
- cargo test --features "${CARGO_FEATURES}"
- ./tests/test.sh
- |
# Save cache
if [ "$CI_COMMIT_BRANCH" == "$CI_DEFAULT_BRANCH" ]; then
......@@ -156,7 +155,7 @@ test-vanilla:
variables:
CACHE_ARCHIVE: /shared-storage/tarantool-module/vanilla-cache.tar
DOCKER_IMAGE: ${VANILLA_DOCKER_IMAGE}
CARGO_FEATURES: ""
CARGO_FEATURES: default
test-picodata:
extends: .test
......@@ -168,9 +167,10 @@ test-picodata:
pages:
extends: .test
variables:
DOCKER_IMAGE: ${VANILLA_DOCKER_IMAGE}
DOCKER_IMAGE: ${PICODATA_DOCKER_IMAGE}
RUSTDOCFLAGS: "-Dwarnings"
script:
- cargo doc --no-deps
- cargo doc --workspace --no-deps --features "picodata"
- rm -rf public
- mv target/doc public
artifacts:
......
......@@ -93,6 +93,8 @@
- In `tlua` if a lua error happens during code evaluation the location in the
rust program where the code was created is now displayed in the error, i.e.
the location of a call to `Lua::eval`, `Lua::exec`, etc. will be displayed.
- `tlua::Lua::set` function now has 2 generic parameters instead of 3 (not
including lifetime parameters).
### Deprecated
- `update_ops` & `upsert_ops` methods of `Space` & `Index` are deprecated in
......
......@@ -3,11 +3,11 @@ all:
test:
cargo build -p tarantool-module-test-runner
tests/test.sh
cargo test
test-pd:
cargo build -p tarantool-module-test-runner --features=picodata
TARANTOOL_EXECUTABLE=tarantool-pd tests/test.sh
TARANTOOL_EXECUTABLE=tarantool-pd cargo test
benchmark:
tests/run_benchmarks.lua
......@@ -434,6 +434,8 @@ make
make test
```
See [test readme](./tests/README.md) for more information about test structure and adding them.
## Contributing
Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
......@@ -449,6 +451,7 @@ We use [SemVer](http://semver.org/) for versioning. For the versions available,
- **Anton Melnikov**
- **Dmitriy Koltsov**
- **Georgy Moshkin**
- **Egor Ivkov**
© 2020-2022 Picodata.io https://git.picodata.io/picodata
## License
......
......@@ -42,14 +42,18 @@ sha-1 = "0.9"
tarantool-proc = { path = "../tarantool-proc", version = "0.1.1" }
uuid = "0.8.2"
futures = "0.3.25"
linkme = { version = "0.2.10", optional = true }
tester = { version = "0.7.0", optional = true }
[target.'cfg(not(all(target_arch = "aarch64", target_os = "macos")))'.dependencies]
va_list = "0.1.3"
[features]
default = ["net_box"]
default = ["net_box", "network_client"]
net_box = ["refpool"]
schema = []
defer = []
picodata = []
network_client = []
all = ["default", "schema", "defer"]
tarantool_test = ["linkme", "tester"]
......@@ -39,6 +39,7 @@ pub use channel::{
pub mod mutex;
use crate::ffi::tarantool::fiber_sleep;
pub use mutex::Mutex;
pub use r#async::block_on;
macro_rules! impl_debug_stub {
($t:ident $($p:tt)*) => {
......
......@@ -2,148 +2,14 @@ use std::{future::Future, rc::Rc, task::Poll, time::Instant};
use futures::pin_mut;
pub mod oneshot {
use super::timeout::Timeout;
use std::{
cell::Cell,
future::Future,
pin::Pin,
rc::{Rc, Weak},
task::{Context, Poll, Waker},
time::Duration,
};
pub mod oneshot;
pub mod timeout;
pub mod watch;
enum State<T> {
Pending(Option<Waker>),
Ready(T),
}
impl<T> Default for State<T> {
fn default() -> Self {
Self::Pending(None)
}
}
pub struct Receiver<T>(Rc<Cell<State<T>>>);
pub struct Sender<T>(Weak<Cell<State<T>>>);
impl<T> Receiver<T> {
pub fn timeout(self, timeout: Duration) -> Timeout<Self> {
super::timeout::timeout(timeout, self)
}
}
impl<T> Future for Receiver<T> {
type Output = Option<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let cell = &self.0;
match cell.take() {
State::Pending(mut waker) if Rc::weak_count(cell) > 0 => {
waker.get_or_insert_with(|| cx.waker().clone());
cell.set(State::Pending(waker));
Poll::Pending
}
State::Pending(_) => Poll::Ready(None),
State::Ready(t) => Poll::Ready(Some(t)),
}
}
}
impl<T> Sender<T> {
/// Sends the `value` and notifies the receiver.
pub fn send(self, value: T) {
let cell = if let Some(cell) = self.0.upgrade() {
cell
} else {
return;
};
if let State::Pending(Some(waker)) = cell.take() {
waker.wake()
}
cell.set(State::Ready(value));
}
/// Returns true if there's no receiver awaiting,
pub fn is_dropped(&self) -> bool {
self.0.strong_count() == 0
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let cell = if let Some(cell) = self.0.upgrade() {
cell
} else {
return;
};
match cell.take() {
ready @ State::Ready(_) => cell.set(ready),
State::Pending(Some(waker)) => waker.wake(),
State::Pending(None) => (),
}
}
}
pub fn channel<T>() -> (Receiver<T>, Sender<T>) {
let cell = Cell::new(State::default());
let strong = Rc::from(cell);
let weak = Rc::downgrade(&strong);
(Receiver(strong), Sender(weak))
}
}
pub mod timeout {
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use super::context::ContextExt;
pub struct Timeout<F> {
future: F,
deadline: Instant,
}
pub fn timeout<F: Future>(timeout: Duration, f: F) -> Timeout<F> {
Timeout {
future: f,
deadline: Instant::now() + timeout,
}
}
impl<F: Future> Timeout<F> {
#[inline]
fn pin_get_future(self: Pin<&mut Self>) -> Pin<&mut F> {
// This is okay because `field` is pinned when `self` is.
unsafe { self.map_unchecked_mut(|s| &mut s.future) }
}
}
impl<F: Future> Future for Timeout<F> {
type Output = Option<F::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let deadline = self.deadline;
// First, try polling the future
if let Poll::Ready(v) = self.pin_get_future().poll(cx) {
Poll::Ready(Some(v))
} else if Instant::now() > deadline {
Poll::Ready(None) // expired
} else {
// SAFETY: This is safe as long as the `Context` really
// is the `ContextExt`. It's always true within provided
// `block_on` async runtime.
unsafe { ContextExt::set_deadline(cx, deadline) };
Poll::Pending
}
}
}
}
/// Error that happens on the receiver side of the channel.
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
#[error("sender dropped")]
pub struct RecvError;
mod waker {
use crate::fiber;
......@@ -263,6 +129,9 @@ mod context {
}
}
/// Runs a future to completion on the fiber-based runtime. This is the async runtime’s entry point.
///
/// This runs the given future on the current fiber, blocking until it is complete, and yielding its resolved result.
pub fn block_on<F: Future>(f: F) -> F::Output {
let rcw: Rc<waker::FiberWaker> = Default::default();
let waker = waker::with_rcw(rcw.clone());
......
//! A one-shot channel is used for sending a single message between
//! asynchronous tasks. The [`channel`] function is used to create a
//! [`Sender`] and [`Receiver`] handle pair that form the channel.
//!
//! The `Sender` handle is used by the producer to send the value.
//! The `Receiver` handle is used by the consumer to receive the value.
//!
//! Each handle can be used on separate fiber.
//!
//! Since the `send` method is not async it can be used from non-async code.
//!
//! # Example
//! ```no_run
//! use tarantool::fiber::r#async::oneshot;
//! use tarantool::fiber;
//!
//! let (tx, rx) = oneshot::channel::<i32>();
//! tx.send(56);
//! let value = fiber::block_on(rx);
//! ```
//!
//! If the sender is dropped without sending, the receiver will fail with
//! [`super::RecvError`]:
use super::RecvError;
use std::{
cell::Cell,
future::Future,
pin::Pin,
rc::{Rc, Weak},
task::{Context, Poll, Waker},
};
enum State<T> {
Pending(Option<Waker>),
Ready(T),
}
impl<T> Default for State<T> {
fn default() -> Self {
Self::Pending(None)
}
}
/// Receives a value from the associated [`Sender`].
///
/// A pair of both a [`Sender`] and a [`Receiver`] are created by the
/// [`channel`](fn@channel) function.
///
/// This channel has no `recv` method because the receiver itself implements the
/// [`Future`] trait. To receive a value, `.await` the `Receiver` object directly.
///
/// If the sender is dropped without sending, the receiver will fail with
/// [`super::RecvError`]
pub struct Receiver<T>(Rc<Cell<State<T>>>);
/// Sends a value to the associated [`Receiver`].
///
/// A pair of both a [`Sender`] and a [`Receiver`] are created by the
/// [`channel`](fn@channel) function.
///
/// If the sender is dropped without sending, the receiver will fail with
/// [`super::RecvError`]
pub struct Sender<T>(Weak<Cell<State<T>>>);
impl<T> Receiver<T> {
/// Returns `true` if the associated [`Sender`] handle has been dropped.
///
/// If `true` is returned, awaiting this future will always result in an error.
#[inline]
pub fn is_closed(&self) -> bool {
Rc::weak_count(&self.0) == 0
}
}
impl<T> Future for Receiver<T> {
type Output = Result<T, RecvError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let cell = &self.0;
match cell.take() {
State::Pending(mut waker) if !self.is_closed() => {
waker.get_or_insert_with(|| cx.waker().clone());
cell.set(State::Pending(waker));
Poll::Pending
}
State::Pending(_) => Poll::Ready(Err(RecvError)),
State::Ready(t) => Poll::Ready(Ok(t)),
}
}
}
impl<T> Sender<T> {
/// Attempts to send a value on this channel, returning it back if it could
/// not be sent.
///
/// This method consumes `self` as only one value may ever be sent on a oneshot
/// channel. It is not marked async because sending a message to an oneshot
/// channel never requires any form of waiting. Because of this, the `send`
/// method can be used in both synchronous and asynchronous code without
/// problems.
///
/// A successful send occurs when it is determined that the other end of the
/// channel has not hung up already. An unsuccessful send would be one where
/// the corresponding receiver has already been deallocated. Note that a
/// return value of `Err` means that the data will never be received, but
/// a return value of `Ok` does *not* mean that the data will be received.
/// It is possible for the corresponding receiver to hang up immediately
/// after this function returns `Ok`.
pub fn send(self, value: T) -> Result<(), T> {
let cell = if let Some(cell) = self.0.upgrade() {
cell
} else {
return Err(value);
};
if let State::Pending(Some(waker)) = cell.take() {
waker.wake()
}
cell.set(State::Ready(value));
Ok(())
}
/// Returns `true` if the associated [`Receiver`] handle has been dropped.
///
/// A [`Receiver`] is closed when
/// [`Receiver`] value is dropped.
///
/// If `true` is returned, a call to `send` will always result in an error.
pub fn is_closed(&self) -> bool {
self.0.strong_count() == 0
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let cell = if let Some(cell) = self.0.upgrade() {
cell
} else {
return;
};
match cell.take() {
ready @ State::Ready(_) => cell.set(ready),
State::Pending(Some(waker)) => waker.wake(),
State::Pending(None) => (),
}
}
}
/// Creates a new one-shot channel for sending single values across asynchronous
/// tasks.
///
/// The function returns separate "send" and "receive" handles. The `Sender`
/// handle is used by the producer to send the value. The `Receiver` handle is
/// used by the consumer to receive the value.
///
/// Each handle can be used on separate fibers.
///
/// See [`super::oneshot`] for examples.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let cell = Cell::new(State::default());
let strong = Rc::from(cell);
let weak = Rc::downgrade(&strong);
(Sender(weak), Receiver(strong))
}
#[cfg(feature = "tarantool_test")]
mod tests {
use super::*;
use crate::fiber;
use crate::test::{TestCase, TESTS};
use crate::test_name;
use futures::join;
use linkme::distributed_slice;
use std::time::Duration;
#[distributed_slice(TESTS)]
static DROP_RECEIVER: TestCase = TestCase {
name: test_name!("drop_receiver"),
f: || {
let (tx, rx) = channel::<i32>();
assert!(!tx.is_closed());
drop(rx);
assert!(tx.is_closed());
assert_eq!(tx.send(0).unwrap_err(), 0);
},
};
#[distributed_slice(TESTS)]
static DROP_SENDER: TestCase = TestCase {
name: test_name!("drop_sender"),
f: || {
let (tx, rx) = channel::<i32>();
assert!(!rx.is_closed());
drop(tx);
assert!(rx.is_closed());
assert_eq!(fiber::block_on(rx).unwrap_err(), RecvError);
},
};
#[distributed_slice(TESTS)]
static RECEIVE_NON_BLOCKING: TestCase = TestCase {
name: test_name!("receive_non_blocking"),
f: || {
let (tx, rx) = channel::<i32>();
tx.send(56).unwrap();
assert_eq!(fiber::block_on(rx), Ok(56));
},
};
#[distributed_slice(TESTS)]
static RECEIVE_NON_BLOCKING_AFTER_DROPPING_SENDER: TestCase = TestCase {
name: test_name!("receive_non_blocking_after_dropping_sender"),
f: || {
let (tx, rx) = channel::<i32>();
drop(tx);
assert_eq!(fiber::block_on(rx), Err(RecvError));
},
};
#[distributed_slice(TESTS)]
static RECEIVE_BLOCKING_BEFORE_SENDING: TestCase = TestCase {
name: test_name!("receive_blocking_before_sending"),
f: || {
let (tx, rx) = channel::<i32>();
let jh = fiber::start(move || fiber::block_on(rx));
tx.send(39).unwrap();
assert_eq!(jh.join(), Ok(39));
},
};
#[distributed_slice(TESTS)]
static RECEIVE_BLOCKING_BEFORE_DROPPING_SENDER: TestCase = TestCase {
name: test_name!("receive_blocking_before_dropping_sender"),
f: || {
let (tx, rx) = channel::<i32>();
let jh = fiber::start(move || fiber::block_on(rx));
drop(tx);
assert_eq!(jh.join(), Err(RecvError));
},
};
#[distributed_slice(TESTS)]
static JOIN_TWO_AFTER_SENDING: TestCase = TestCase {
name: test_name!("join_two_after_sending"),
f: || {
let f = async {
let (tx1, rx1) = channel::<i32>();
let (tx2, rx2) = channel::<i32>();
tx1.send(101).unwrap();
tx2.send(102).unwrap();
join!(rx1, rx2)
};
assert_eq!(fiber::block_on(f), (Ok(101), Ok(102)));
},
};
#[distributed_slice(TESTS)]
static JOIN_TWO_BEFORE_SENDING: TestCase = TestCase {
name: test_name!("join_two_before_sending"),
f: || {
let c = fiber::Cond::new();
drop(c);
let (tx1, rx1) = channel::<i32>();
let (tx2, rx2) = channel::<i32>();
let jh = fiber::start(move || fiber::block_on(async { join!(rx1, rx2) }));
tx1.send(201).unwrap();
fiber::sleep(Duration::ZERO);
tx2.send(202).unwrap();
assert_eq!(jh.join(), (Ok(201), Ok(202)));
},
};
#[distributed_slice(TESTS)]
static JOIN_TWO_DROP_ONCE: TestCase = TestCase {
name: test_name!("join_two_drop_one"),
f: || {
let (tx1, rx1) = channel::<i32>();
let (tx2, rx2) = channel::<i32>();
let jh = fiber::start(move || fiber::block_on(async { join!(rx1, rx2) }));
tx1.send(301).unwrap();
fiber::sleep(Duration::ZERO);
drop(tx2);
assert_eq!(jh.join(), (Ok(301), Err(RecvError)));
},
};
}
//! Allows a future to execute for a maximum amount of time.
//!
//! See [`Timeout`] documentation for more details.
//!
//! [`Timeout`]: struct@Timeout
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use super::context::ContextExt;
/// Error returned by [`Timeout`]
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
#[error("deadline expired")]
pub struct Expired;
/// Future returned by [`timeout`](timeout).
pub struct Timeout<F> {
future: F,
deadline: Instant,
}
/// Requires a `Future` to complete before the specified duration has elapsed.
///
/// If the future completes before the duration has elapsed, then the completed
/// value is returned. Otherwise, an error is returned and the future is
/// canceled.
///
/// A `timeout` equal to [`Duration::ZERO`] guarantees that awaiting this future
/// will **not** result in a fiber yield.
///
/// ```no_run
/// use tarantool::fiber::r#async::*;
/// use tarantool::fiber;
/// use std::time::Duration;
///
/// let (tx, rx) = oneshot::channel::<i32>();
///
/// // Wrap the future with a `Timeout` set to expire in 10 milliseconds.
/// if let Err(_) = fiber::block_on(timeout::timeout(Duration::from_millis(10), rx)) {
/// println!("did not receive value within 10 ms");
/// }
/// ```
#[inline]
pub fn timeout<F: Future>(timeout: Duration, f: F) -> Timeout<F> {
let now = Instant::now();
let deadline = now.checked_add(timeout).unwrap_or_else(|| {
// Add 30 years for now, because this is what tokio does:
// https://github.com/tokio-rs/tokio/blob/22862739dddd49a94065aa7a917cde2dc8a3f6bc/tokio/src/time/instant.rs#L58-L62
now + Duration::from_secs(60 * 60 * 24 * 365 * 30)
});
Timeout {
future: f,
deadline,
}
}
impl<F: Future> Timeout<F> {
#[inline]
fn pin_get_future(self: Pin<&mut Self>) -> Pin<&mut F> {
// This is okay because `field` is pinned when `self` is.
unsafe { self.map_unchecked_mut(|s| &mut s.future) }
}
}
impl<F: Future> Future for Timeout<F> {
type Output = Result<F::Output, Expired>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let deadline = self.deadline;
// First, try polling the future
if let Poll::Ready(v) = self.pin_get_future().poll(cx) {
Poll::Ready(Ok(v))
} else if Instant::now() >= deadline {
Poll::Ready(Err(Expired)) // expired
} else {
// SAFETY: This is safe as long as the `Context` really
// is the `ContextExt`. It's always true within provided
// `block_on` async runtime.
unsafe { ContextExt::set_deadline(cx, deadline) };
Poll::Pending
}
}
}
/// Futures implementing this trait can be constrained with a timeout (see
/// [`Timeout`]).
///
/// **NOTE**: this trait is implemented for all type implementing
/// [`std::future::Future`], but it must be used **only** with futures from
/// [`crate::fiber::async`] otherwise the behaviour is undefined.
pub trait IntoTimeout: Future + Sized {
/// Adds timeout to a future. See [`Timeout`].
#[inline]
fn timeout(self, timeout: Duration) -> Timeout<Self> {
super::timeout::timeout(timeout, self)
}
}
impl<T> IntoTimeout for T where T: Future + Sized {}
#[cfg(feature = "tarantool_test")]
mod tests {
use super::*;
use crate::fiber;
use crate::fiber::r#async::{oneshot, RecvError};
use crate::test::check_yield;
use crate::test::YieldResult::{DoesntYield, Yields};
use crate::test::{TestCase, TESTS};
use crate::test_name;
use linkme::distributed_slice;
use std::time::Duration;
const _0_SEC: Duration = Duration::ZERO;
const _1_SEC: Duration = Duration::from_secs(1);
#[distributed_slice(TESTS)]
static INSTANT_FUTURE: TestCase = TestCase {
name: test_name!("instant_future"),
f: || {
let fut = async { 78 };
assert_eq!(fiber::block_on(fut), 78);
let fut = timeout(Duration::ZERO, async { 79 });
assert_eq!(fiber::block_on(fut), Ok(79));
},
};
#[distributed_slice(TESTS)]
static ACTUAL_TIMEOUT_PROMISE: TestCase = TestCase {
name: test_name!("actual_timeout_promise"),
f: || {
let (tx, rx) = oneshot::channel::<i32>();
let fut = async move { rx.timeout(_0_SEC).await };
let jh = fiber::start(|| fiber::block_on(fut));
assert_eq!(jh.join(), Err(Expired));
drop(tx);
},
};
#[distributed_slice(TESTS)]
static DROP_TX_BEFORE_TIMEOUT: TestCase = TestCase {
name: test_name!("drop_tx_before_timeout"),
f: || {
let (tx, rx) = oneshot::channel::<i32>();
let fut = async move { rx.timeout(_1_SEC).await };
let jh = fiber::start(move || fiber::block_on(fut));
drop(tx);
assert_eq!(jh.join(), Ok(Err(RecvError)));
},
};
#[distributed_slice(TESTS)]
static SEND_TX_BEFORE_TIMEOUT: TestCase = TestCase {
name: test_name!("send_tx_before_timeout"),
f: || {
let (tx, rx) = oneshot::channel::<i32>();
let fut = async move { rx.timeout(_1_SEC).await };
let jh = fiber::start(move || fiber::block_on(fut));
tx.send(400).unwrap();
assert_eq!(jh.join(), Ok(Ok(400)));
},
};
#[distributed_slice(TESTS)]
static TIMEOUT_DURATION_MAX: TestCase = TestCase {
name: test_name!("timeout_duration_max"),
f: || {
// must not panic
timeout(Duration::MAX, async { 1 });
},
};
#[distributed_slice(TESTS)]
static AWAIT_ACTUALLY_YIELDS: TestCase = TestCase {
name: test_name!("await_actually_yields"),
f: || {
// ready future, no timeout -> no yield
assert_eq!(
check_yield(|| fiber::block_on(async { 101 })),
DoesntYield(101)
);
// ready future, 0 timeout -> no yield
assert_eq!(
check_yield(|| fiber::block_on(timeout(Duration::ZERO, async { 202 }))),
DoesntYield(Ok(202))
);
// ready future, positive timeout -> no yield
assert_eq!(
check_yield(|| fiber::block_on(timeout(Duration::from_secs(1), async { 303 }))),
DoesntYield(Ok(303))
);
// pending future, no timeout -> yield
let (_tx, rx) = oneshot::channel::<i32>();
let f = check_yield(|| fiber::start(|| fiber::block_on(rx)));
// the yield happens as soon as fiber::start is called,
// but if fiber::block_on didn't yield we wouldn't even get here,
// so this check is totally legit
assert!(matches!(f, Yields(_)));
// we leak some memory here, but avoid a panic.
// Don't do this in your code
std::mem::forget(f);
// pending future, 0 timeout -> no yield
let (_tx, rx) = oneshot::channel::<i32>();
assert_eq!(
check_yield(|| fiber::block_on(timeout(Duration::ZERO, rx))),
DoesntYield(Err(Expired))
);
// pending future, positive timeout -> yield
let (_tx, rx) = oneshot::channel::<i32>();
assert_eq!(
check_yield(|| fiber::block_on(timeout(Duration::from_millis(10), rx))),
Yields(Err(Expired))
);
},
};
}
// SAFETY:
// In this module `RefCell::borrow` is used a lot.
// This method panics if there are alive mutable borrows at that moment.
// But in this case it is safe to do this as:
// 1. Mutable borrows are taken and released in an encapsulated Sender functions
// 2. There are no `await` or `fiber::sleep` calls inside sender functions
// 3. This module is meant for single threaded async runtime
//
//! A single-producer, multi-consumer channel that only retains the *last* sent
//! value.
//!
//! This channel is useful for watching for changes to a value from multiple
//! points in the code base, for example, changes to configuration values.
//!
//! # Usage
//!
//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
//! and sender halves of the channel. The channel is created with an initial
//! value. The **latest** value stored in the channel is accessed with
//! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new
//! value to sent by the [`Sender`] half.
//!
//! # Example
//! ```no_run
//! use tarantool::fiber::r#async::watch;
//! use tarantool::fiber;
//!
//! let (tx, mut rx) = watch::channel::<i32>(10);
//! tx.send(20).unwrap();
//! let value = fiber::block_on(async move {
//! rx.changed().await.unwrap();
//! rx.get()
//! });
//! ```
//!
//! # Closing
//!
//! [`Sender::is_closed`] allows the producer to detect
//! when all [`Receiver`] handles have been dropped. This indicates that there
//! is no further interest in the values being produced and work can be stopped.
use super::RecvError;
use std::{
cell::{Cell, Ref, RefCell},
future::Future,
ops::Deref,
pin::Pin,
rc::Rc,
task::{Context, Poll, Waker},
};
pub struct Value<T> {
value: T,
version: u64,
}
struct State<T> {
value: RefCell<Value<T>>,
// I would be better to use HashSet here,
// but `Waker` doesn't implement it.
wakers: RefCell<Vec<Waker>>,
sender_exists: Cell<bool>,
}
impl<T> State<T> {
fn add_waker(&self, waker: &Waker) {
let mut wakers = self.wakers.borrow_mut();
if !wakers.iter().any(|w| waker.will_wake(w)) {
wakers.push(waker.clone());
}
}
fn wake_all(&self) {
for waker in self.wakers.borrow_mut().drain(..) {
waker.wake()
}
}
}
/// Error produced when sending a value fails.
#[derive(thiserror::Error, Debug)]
#[error(
"failed to send this value, as someone is currently holding a reference to the previous value"
)]
pub struct SendError<T>(pub T);
/// Sends values to the associated [`Receiver`](struct@Receiver).
///
/// Instances are created by the [`channel`](fn@channel) function.
pub struct Sender<T> {
state: Rc<State<T>>,
}
/// Receives values from the associated [`Sender`](struct@Sender).
///
/// Instances are created by the [`channel`](fn@channel) function.
pub struct Receiver<T> {
state: Rc<State<T>>,
seen_version: u64,
}
impl<T> Sender<T> {
/// Creates a new [`Receiver`] connected to this `Sender`.
///
/// All messages sent before this call to `subscribe` are initially marked
/// as seen by the new `Receiver`.
///
/// This method can be called even if there are no other receivers. In this
/// case, the channel is reopened.
pub fn subscribe(&self) -> Receiver<T> {
Receiver {
state: self.state.clone(),
seen_version: self.state.value.borrow().version,
}
}
/// Sends a new value via the channel, notifying all receivers.
///
/// This method fails if any of receivers is currently holding a reference
/// to the previous value.
pub fn send(&self, value: T) -> Result<(), SendError<T>> {
if let Ok(mut value_ref) = self.state.value.try_borrow_mut() {
value_ref.value = value;
// It is ok to overflow as we check only the difference in version
// and having receivers stuck near 0 version when sender has exceeded u64 is extremely unlickely.
value_ref.version = value_ref.version.wrapping_add(1);
} else {
return Err(SendError(value));
}
self.state.wake_all();
Ok(())
}
/// Modifies the watched value in place, notifying all receivers.
///
/// This can useful for modifying the watched value,
/// without having to allocate a new instance.
/// This method permits sending values even when there are no receivers.
///
/// This method fails if any of receivers is currently holding a reference
/// to the previous value.
pub fn send_modify(&self, modify: impl FnOnce(&mut T)) -> Result<(), SendError<()>> {
let mut value_ref = self
.state
.value
.try_borrow_mut()
.map_err(|_| SendError(()))?;
modify(&mut value_ref.value);
value_ref.version = value_ref.version.wrapping_add(1);
self.state.wake_all();
Ok(())
}
/// Returns a reference to the most recently sent value.
///
/// Care must be taken not to hold a ref, when the sender is setting a new value.
/// This includes not holding a ref across await points and not explicitely yielding
/// control to other fibers while holding a ref.
///
/// Consider using [`Self::get`] or [`Self::get_cloned`] instead.
pub fn borrow(&self) -> ValueRef<T> {
ValueRef(self.state.value.borrow())
}
/// Returns a copy of the most recently sent value.
pub fn get(&self) -> T
where
T: Copy,
{
*self.borrow().deref()
}
/// Returns the most recently sent value cloned.
pub fn get_cloned(&self) -> T
where
T: Clone,
{
self.borrow().deref().clone()
}
/// Checks if the channel has been closed. This happens when all receivers
/// have dropped.
pub fn is_closed(&self) -> bool {
// Only the rc instance of this sender remains
Rc::strong_count(&self.state) == 1
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.state.sender_exists.set(false);
self.state.wake_all()
}
}
/// Returns a reference to the inner value.
///
/// Outstanding borrows hold a read lock on the inner value. This means that
/// long lived borrows could cause the produce half to block. It is recommended
/// to keep the borrow as short lived as possible.
pub struct ValueRef<'a, T>(Ref<'a, Value<T>>);
impl<'a, T> Deref for ValueRef<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0.value
}
}
/// Future that returns when a new value is published in [`Sender`].
pub struct Notification<'a, T> {
rx: &'a mut Receiver<T>,
}
impl<T> Receiver<T> {
/// Checks if this channel contains a message that this receiver has not yet
/// seen. The new value is not marked as seen.
///
/// Although this method is called `has_changed`, it does not check new
/// messages for equality, so this call will return true even if the new
/// message is equal to the old message.
pub fn has_changed(&self) -> bool {
self.state.value.borrow().version != self.seen_version
}
/// Waits for a change notification, then marks the newest value as seen.
///
/// If the newest value in the channel has not yet been marked seen when
/// this method is called, the method marks that value seen and returns
/// immediately. If the newest value has already been marked seen, then the
/// method sleeps until a new message is sent by the [`Sender`] connected to
/// this `Receiver`, or until the [`Sender`] is dropped.
///
/// This method returns an error if and only if the [`Sender`] is dropped.
pub fn changed(&mut self) -> Notification<T> {
Notification { rx: self }
}
/// Returns a reference to the most recently sent value.
///
/// This method does not mark the returned value as seen, so future calls to
/// [`Self::changed`] may return immediately even if you have already seen the
/// value with a call to `borrow`.
///
/// Care must be taken not to hold a ref, when the sender is setting a new value.
/// This includes not holding a ref across await points and not explicitely yielding
/// control to other fibers while holding a ref.
///
/// Consider using [`Self::get`] or [`Self::get_cloned`] instead.
pub fn borrow(&self) -> ValueRef<T> {
ValueRef(self.state.value.borrow())
}
/// Returns a copy of the most recently sent value.
///
/// This method does not mark the returned value as seen, so future calls to
/// [`Self::changed`] may return immediately even if you have already seen the
/// value with a call to `borrow`.
pub fn get(&self) -> T
where
T: Copy,
{
*self.borrow().deref()
}
/// Returns the most recently sent value cloned.
///
/// This method does not mark the returned value as seen, so future calls to
/// [`Self::changed`] may return immediately even if you have already seen the
/// value with a call to `borrow`.
pub fn get_cloned(&self) -> T
where
T: Clone,
{
self.borrow().deref().clone()
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
seen_version: self.state.value.borrow().version,
}
}
}
impl<'a, T> Future for Notification<'a, T> {
type Output = Result<(), RecvError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.rx.state.sender_exists.get() {
return Poll::Ready(Err(RecvError));
}
let version = self.rx.state.value.borrow().version;
if version != self.rx.seen_version {
self.rx.seen_version = version;
Poll::Ready(Ok(()))
} else {
self.rx.state.add_waker(cx.waker());
Poll::Pending
}
}
}
/// Creates a new watch channel, returning the "send" and "receive" handles.
///
/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
/// Only the last value sent is made available to the [`Receiver`] half. All
/// intermediate values are dropped.
///
/// See [`super::watch`] for examples.
pub fn channel<T>(initial: T) -> (Sender<T>, Receiver<T>) {
let state = State {
value: RefCell::new(Value {
value: initial,
version: 0,
}),
wakers: Default::default(),
sender_exists: Cell::new(true),
};
let tx = Sender {
state: Rc::new(state),
};
let rx = tx.subscribe();
(tx, rx)
}
#[cfg(feature = "tarantool_test")]
mod tests {
#![allow(clippy::approx_constant)]
use super::*;
use crate::fiber;
use crate::fiber::r#async::timeout::{self, IntoTimeout};
use crate::test::{TestCase, TESTS};
use crate::test_name;
use futures::join;
use linkme::distributed_slice;
use std::time::Duration;
const _1_SEC: Duration = Duration::from_secs(1);
#[distributed_slice(TESTS)]
static RECEIVE_NOTIFICATION_SENT_BEFORE: TestCase = TestCase {
name: test_name!("receive_notification_sent_before"),
f: || {
let (tx, mut rx_1) = channel::<i32>(10);
let mut rx_2 = rx_1.clone();
// Subscribe should work same as rx clone
let mut rx_3 = tx.subscribe();
tx.send(20).unwrap();
assert_eq!(
fiber::block_on(async move {
let _ = join!(rx_1.changed(), rx_2.changed(), rx_3.changed());
(*rx_1.borrow(), *rx_2.borrow(), *rx_3.borrow())
}),
(20, 20, 20)
);
},
};
#[distributed_slice(TESTS)]
static RECEIVE_NOTIFICATION_SENT_AFTER: TestCase = TestCase {
name: test_name!("receive_notification_sent_after"),
f: || {
let (tx, mut rx_1) = channel::<i32>(10);
let mut rx_2 = rx_1.clone();
// Subscribe should work same as rx clone
let mut rx_3 = tx.subscribe();
let jh = fiber::start(move || {
fiber::block_on(async move {
let _ = join!(rx_1.changed(), rx_2.changed(), rx_3.changed());
(*rx_1.borrow(), *rx_2.borrow(), *rx_3.borrow())
})
});
tx.send(20).unwrap();
assert_eq!(jh.join(), (20, 20, 20))
},
};
#[distributed_slice(TESTS)]
static RECEIVE_MULTIPLE_NOTIFICATIONS: TestCase = TestCase {
name: test_name!("receive_multiple_notifications"),
f: || {
let (tx, mut rx_1) = channel::<i32>(10);
let jh = fiber::start(|| {
fiber::block_on(async {
rx_1.changed().await.unwrap();
*rx_1.borrow()
})
});
tx.send(1).unwrap();
assert_eq!(jh.join(), 1);
let jh = fiber::start(|| {
fiber::block_on(async {
rx_1.changed().await.unwrap();
*rx_1.borrow()
})
});
tx.send(2).unwrap();
assert_eq!(jh.join(), 2);
},
};
#[distributed_slice(TESTS)]
static RETAINS_ONLY_LAST_NOTIFICATION: TestCase = TestCase {
name: test_name!("retains_only_last_notification"),
f: || {
let (tx, mut rx_1) = channel::<i32>(10);
tx.send(1).unwrap();
tx.send(2).unwrap();
tx.send(3).unwrap();
let v = fiber::block_on(async {
rx_1.changed().await.unwrap();
*rx_1.borrow()
});
assert_eq!(v, 3);
// No changes after
assert_eq!(
fiber::block_on(rx_1.changed().timeout(_1_SEC)),
Err(timeout::Expired)
);
},
};
#[distributed_slice(TESTS)]
static NOTIFICATION_RECEIVE_ERROR: TestCase = TestCase {
name: test_name!("notification_receive_error"),
f: || {
let (tx, mut rx_1) = channel::<i32>(10);
let jh = fiber::start(|| fiber::block_on(rx_1.changed()));
drop(tx);
assert_eq!(jh.join(), Err(RecvError));
},
};
#[distributed_slice(TESTS)]
static NOTIFICATION_RECEIVED_IN_CONCURRENT_FIBER: TestCase = TestCase {
name: test_name!("notification_received_in_concurrent_fiber"),
f: || {
let (tx, mut rx_1) = channel::<i32>(10);
let mut rx_2 = rx_1.clone();
let jh_1 = fiber::start(|| fiber::block_on(rx_1.changed()));
let jh_2 = fiber::start(|| fiber::block_on(rx_2.changed()));
tx.send(1).unwrap();
assert!(jh_1.join().is_ok());
assert!(jh_2.join().is_ok());
},
};
#[distributed_slice(TESTS)]
static SEND_MODIFY: TestCase = TestCase {
name: test_name!("send_modify"),
f: || {
let (tx, mut rx) = channel(vec![13]);
let jh = fiber::start(|| {
fiber::block_on(rx.changed()).unwrap();
rx.get_cloned()
});
tx.send_modify(|v| v.push(37)).unwrap();
assert_eq!(jh.join(), [13, 37]);
},
};
#[distributed_slice(TESTS)]
static SENDER_GET: TestCase = TestCase {
name: test_name!("sender_get"),
f: || {
let (tx, _) = channel(69);
assert_eq!(tx.get(), 69);
tx.send(420).unwrap();
assert_eq!(tx.get(), 420);
let (tx, _) = channel("foo".to_string());
assert_eq!(tx.get_cloned(), "foo");
tx.send("bar".into()).unwrap();
assert_eq!(tx.get_cloned(), "bar");
let (tx, mut rx) = channel(RefCell::new(vec![3.14]));
let value_ref = tx.borrow();
assert_eq!(*value_ref.borrow(), [3.14]);
// modify the watched value without notifying the watchers
// don't do that though
value_ref.borrow_mut().push(2.71);
assert_eq!(*tx.get_cloned().borrow(), [3.14, 2.71]);
let res = fiber::block_on(rx.changed().timeout(Duration::ZERO));
assert_eq!(res, Err(timeout::Expired));
// and sending fails until the ref is dropped
// really don't do that
tx.send_modify(|v| v.get_mut().push(1.61)).unwrap_err();
drop(value_ref);
tx.send_modify(|v| v.get_mut().push(1.61)).unwrap();
fiber::block_on(rx.changed()).unwrap();
assert_eq!(*rx.get_cloned().borrow(), [3.14, 2.71, 1.61]);
},
};
}
......@@ -165,12 +165,17 @@ pub mod log;
#[doc(hidden)]
pub mod msgpack;
pub mod net_box;
// Temporarily private as it is in development
#[allow(unused)]
mod network;
pub mod proc;
pub mod schema;
pub mod sequence;
pub mod session;
pub mod space;
pub mod sql;
#[cfg(feature = "tarantool_test")]
pub mod test;
pub mod transaction;
pub mod trigger;
pub mod tuple;
......
......@@ -635,7 +635,7 @@ pub fn value_slice(cursor: &mut Cursor<impl AsRef<[u8]>>) -> crate::Result<&[u8]
#[derive(Debug)]
pub struct ResponseError {
message: String,
pub(crate) message: String,
}
impl Display for ResponseError {
......
use std::rc::Rc;
use std::vec::IntoIter;
use crate::error::Error;
use crate::index::IteratorType;
use crate::network::protocol::codec;
use crate::tuple::{Encode, ToTupleBuffer, Tuple};
use super::inner::ConnInner;
use super::Options;
/// Remote index (a group of key values and pointers)
pub struct RemoteIndex {
conn_inner: Rc<ConnInner>,
space_id: u32,
index_id: u32,
}
impl RemoteIndex {
pub(crate) fn new(conn_inner: Rc<ConnInner>, space_id: u32, index_id: u32) -> Self {
RemoteIndex {
conn_inner,
space_id,
index_id,
}
}
/// The remote-call equivalent of the local call `Index::get(...)`
/// (see [details](../index/struct.Index.html#method.get)).
pub fn get<K>(&self, key: &K, options: &Options) -> Result<Option<Tuple>, Error>
where
K: ToTupleBuffer,
{
Ok(self
.select(
IteratorType::Eq,
key,
&Options {
offset: 0,
limit: Some(1),
..options.clone()
},
)?
.next())
}
/// The remote-call equivalent of the local call `Index::select(...)`
/// (see [details](../index/struct.Index.html#method.select)).
pub fn select<K>(
&self,
iterator_type: IteratorType,
key: &K,
options: &Options,
) -> Result<RemoteIndexIterator, Error>
where
K: ToTupleBuffer,
{
self.conn_inner.request(
|buf, sync| {
codec::encode_select(
buf,
sync,
self.space_id,
self.index_id,
options.limit.unwrap_or(u32::max_value()),
options.offset,
iterator_type,
key,
)
},
|buf, _| {
codec::decode_multiple_rows(buf, None).map(|result| RemoteIndexIterator {
inner: result.into_iter(),
})
},
options,
)
}
/// The remote-call equivalent of the local call `Space::update(...)`
/// (see [details](../index/struct.Index.html#method.update)).
pub fn update<K, Op>(
&self,
key: &K,
ops: &[Op],
options: &Options,
) -> Result<Option<Tuple>, Error>
where
K: ToTupleBuffer,
Op: Encode,
{
self.conn_inner.request(
|buf, sync| codec::encode_update(buf, sync, self.space_id, self.index_id, key, ops),
codec::decode_single_row,
options,
)
}
/// The remote-call equivalent of the local call `Space::upsert(...)`
/// (see [details](../index/struct.Index.html#method.upsert)).
pub fn upsert<T, Op>(
&self,
value: &T,
ops: &[Op],
options: &Options,
) -> Result<Option<Tuple>, Error>
where
T: ToTupleBuffer,
Op: Encode,
{
self.conn_inner.request(
|buf, sync| codec::encode_upsert(buf, sync, self.space_id, self.index_id, value, ops),
codec::decode_single_row,
options,
)
}
/// The remote-call equivalent of the local call `Space::delete(...)`
/// (see [details](../index/struct.Index.html#method.delete)).
pub fn delete<K>(&self, key: &K, options: &Options) -> Result<Option<Tuple>, Error>
where
K: ToTupleBuffer,
{
self.conn_inner.request(
|buf, sync| codec::encode_delete(buf, sync, self.space_id, self.index_id, key),
codec::decode_single_row,
options,
)
}
}
/// Remote index iterator. Can be used with `for` statement
pub struct RemoteIndexIterator {
inner: IntoIter<Tuple>,
}
impl Iterator for RemoteIndexIterator {
type Item = Tuple;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}
}
use super::promise::Promise;
use super::recv_queue::RecvQueue;
use super::stream::ConnStream;
use super::{Conn, ConnTriggers};
use crate::coio::CoIOStream;
use crate::error::Error;
use crate::fiber::{is_cancelled, reschedule, set_cancellable, sleep, time, Cond, Fiber};
use crate::network::protocol::codec::{self, Header, Request};
use crate::network::protocol::options::{ConnOptions, Options};
use crate::network::protocol::send_queue::{self, SendQueue};
use crate::network::protocol::ConnState;
use crate::tuple::Decode;
use crate::unwrap_or;
use std::cell::{Cell, RefCell};
use std::io::{Cursor, Read, Write};
use std::net::SocketAddr;
use std::rc::{Rc, Weak};
use std::time::{Duration, SystemTime};
pub struct ConnInner {
addrs: Vec<SocketAddr>,
options: ConnOptions,
state: Cell<ConnState>,
state_change_cond: Cond,
stream: RefCell<Option<ConnStream>>,
send_queue: SendQueue,
recv_queue: RecvQueue,
send_fiber: RefCell<Fiber<'static, Weak<ConnInner>>>,
recv_fiber: RefCell<Fiber<'static, Weak<ConnInner>>>,
triggers: RefCell<Option<Rc<dyn ConnTriggers>>>,
error: RefCell<Option<std::io::Error>>,
}
impl ConnInner {
pub fn new(
addrs: Vec<SocketAddr>,
options: ConnOptions,
triggers: Option<Rc<dyn ConnTriggers>>,
) -> Rc<Self> {
// init recv fiber
let mut recv_fiber = Fiber::new("_recv_worker", &mut recv_worker);
recv_fiber.set_joinable(true);
// init send fiber
let mut send_fiber = Fiber::new("_send_worker", &mut send_worker);
send_fiber.set_joinable(true);
// construct object
let conn_inner = Rc::new(ConnInner {
state: Cell::new(ConnState::Init),
state_change_cond: Cond::new(),
stream: RefCell::new(None),
send_queue: SendQueue::new(
options.send_buffer_size,
options.send_buffer_limit,
options.send_buffer_flush_interval,
),
recv_queue: RecvQueue::new(options.recv_buffer_size),
send_fiber: RefCell::new(send_fiber),
recv_fiber: RefCell::new(recv_fiber),
triggers: RefCell::new(triggers),
error: RefCell::new(None),
addrs,
options,
});
// start send/recv fibers
conn_inner
.send_fiber
.borrow_mut()
.start(Rc::downgrade(&conn_inner));
conn_inner
.recv_fiber
.borrow_mut()
.start(Rc::downgrade(&conn_inner));
conn_inner
}
pub fn is_connected(&self) -> bool {
matches!(self.state.get(), ConnState::Active)
}
pub fn wait_connected(self: &Rc<Self>, timeout: Option<Duration>) -> Result<bool, Error> {
let begin_ts = time();
loop {
let state = self.state.get();
match state {
ConnState::Init => {
self.init()?;
}
ConnState::Active => return Ok(true),
ConnState::Closed => return Ok(false),
_ => {
let timeout = match timeout {
None => None,
Some(timeout) => {
timeout.checked_sub(Duration::from_secs_f64(time() - begin_ts))
}
};
if !self.wait_state_changed(timeout) {
return Err(std::io::Error::from(std::io::ErrorKind::TimedOut).into());
}
}
};
}
}
pub fn request<Fp, Fc, R>(
self: &Rc<Self>,
request_producer: Fp,
response_consumer: Fc,
options: &Options,
) -> Result<R, Error>
where
Fp: FnOnce(&mut Cursor<Vec<u8>>, u64) -> Result<(), Error>,
Fc: FnOnce(&mut Cursor<Vec<u8>>, &Header) -> Result<R, Error>,
{
loop {
let state = self.state.get();
match state {
ConnState::Init => {
self.init()?;
}
ConnState::Active => {
return match self.send_queue.send(request_producer) {
Ok(sync) => self
.recv_queue
.recv(sync, response_consumer, options)
.map(|response| response.payload),
Err(err) => Err(self.handle_error(err).err().unwrap()),
};
}
ConnState::Error => self.disconnect(),
ConnState::ErrorReconnect => self.reconnect_or_fail()?,
ConnState::Closed => {
return Err(std::io::Error::from(std::io::ErrorKind::NotConnected).into())
}
_ => {
self.wait_state_changed(None);
}
};
}
}
pub(crate) fn request_async<I, O>(self: &Rc<Self>, request: I) -> crate::Result<Promise<O>>
where
I: Request,
O: for<'de> Decode<'de> + 'static,
{
loop {
match self.state.get() {
ConnState::Init => {
self.init()?;
}
ConnState::Active => {
let sync = self
.send_queue
.send(codec::request_producer(request))
.map_err(|err| self.handle_error(err).err().unwrap())?;
let promise = Promise::new(Rc::downgrade(self));
self.recv_queue.add_consumer(sync, promise.downgrade());
return Ok(promise);
}
ConnState::Error => self.disconnect(),
ConnState::ErrorReconnect => self.reconnect_or_fail()?,
ConnState::Closed => {
return Err(std::io::Error::from(std::io::ErrorKind::NotConnected).into())
}
_ => {
self.wait_state_changed(None);
}
}
}
}
pub fn close(self: &Rc<Self>) {
let state = self.state.get();
if matches!(state, ConnState::Connecting) || matches!(state, ConnState::Auth) {
let _ = self.wait_connected(None);
}
if !matches!(self.state.get(), ConnState::Closed) {
self.disconnect();
let mut send_fiber = self.send_fiber.borrow_mut();
send_fiber.cancel();
send_fiber.join();
let mut recv_fiber = self.recv_fiber.borrow_mut();
recv_fiber.cancel();
recv_fiber.join();
}
}
fn init(self: &Rc<Self>) -> Result<(), Error> {
match self.connect() {
Ok(_) => (),
Err(err) => {
return self.handle_error(err);
}
};
Ok(())
}
fn connect(self: &Rc<Self>) -> Result<(), Error> {
self.update_state(ConnState::Connecting);
// connect
let connect_timeout = self.options.connect_timeout;
let mut stream = if connect_timeout.subsec_nanos() == 0 && connect_timeout.as_secs() == 0 {
CoIOStream::connect(&*self.addrs)?
} else {
CoIOStream::connect_timeout(self.addrs.first().unwrap(), connect_timeout)?
};
// receive greeting msg
let salt = codec::decode_greeting(&mut stream)?;
// auth if required
if !self.options.user.is_empty() {
self.update_state(ConnState::Auth);
self.auth(&mut stream, &salt)?;
}
// if ok: put stream to result + set state to active
self.stream.replace(Some(ConnStream::new(stream)?));
self.update_state(ConnState::Active);
// call trigger (if available)
if let Some(triggers) = self.triggers.borrow().as_ref() {
triggers.on_connect(&Conn::downgrade(self.clone()))?;
}
Ok(())
}
fn auth(&self, stream: &mut CoIOStream, salt: &[u8]) -> Result<(), Error> {
let buf = Vec::new();
let mut cur = Cursor::new(buf);
// send auth request
let sync = self.send_queue.next_sync();
send_queue::write_to_buffer(&mut cur, sync, |buf, sync| {
codec::encode_auth(
buf,
self.options.user.as_str(),
self.options.password.as_str(),
salt,
sync,
)
})?;
stream.write_all(cur.get_ref())?;
// handle response
let response_len = rmp::decode::read_u32(stream)?;
{
let buffer = cur.get_mut();
buffer.clear();
buffer.reserve(response_len as usize);
stream.take(response_len as u64).read_to_end(buffer)?;
cur.set_position(0);
}
let header = codec::decode_header(&mut cur)?;
if header.status_code != 0 {
return Err(codec::decode_error(stream)?.into());
}
Ok(())
}
fn update_state(&self, state: ConnState) {
self.state.set(state);
self.state_change_cond.broadcast();
}
fn wait_state_changed(&self, timeout: Option<Duration>) -> bool {
match timeout {
Some(timeout) => self.state_change_cond.wait_timeout(timeout),
None => self.state_change_cond.wait(),
}
}
fn handle_error(&self, err: Error) -> Result<(), Error> {
if matches!(self.state.get(), ConnState::Closed) {
return Ok(());
}
match err {
Error::IO(err) => {
self.error.replace(Some(err));
self.update_state(ConnState::ErrorReconnect);
Ok(())
}
err => {
self.update_state(ConnState::Error);
Err(err)
}
}
}
fn reconnect_or_fail(self: &Rc<Self>) -> Result<(), Error> {
if matches!(self.state.get(), ConnState::Closed) {
return Ok(());
}
let error = self.error.replace(None).unwrap();
let reconnect_after = self.options.reconnect_after;
if reconnect_after.as_secs() == 0 && reconnect_after.subsec_nanos() == 0 {
self.update_state(ConnState::Error);
return Err(error.into());
} else {
sleep(reconnect_after);
match self.connect() {
Ok(_) => {}
Err(err) => {
self.handle_error(err)?;
}
}
}
Ok(())
}
fn disconnect(&self) {
if matches!(self.state.get(), ConnState::Closed) {
return;
}
self.update_state(ConnState::Closed);
if let Some(stream) = self.stream.borrow().as_ref() {
if stream.is_reader_acquired() {
self.recv_fiber.borrow().wakeup();
}
}
self.recv_queue.close();
self.send_queue.close();
self.stream.replace(None);
if let Some(triggers) = self.triggers.replace(None) {
triggers.on_disconnect();
}
}
}
pub fn flush_queue_to_stream(queue: &SendQueue, stream: &mut impl Write) -> std::io::Result<()> {
let start_ts = SystemTime::now();
let mut prev_data_size = 0u64;
loop {
if !queue.is_active.get() {
return Err(std::io::Error::from(std::io::ErrorKind::TimedOut));
}
let data_size = queue.back_buffer.borrow().position();
if data_size == 0 {
// await for data (if buffer is empty)
queue.swap_cond.wait();
continue;
}
if let Ok(elapsed) = start_ts.elapsed() {
if data_size > prev_data_size && elapsed <= queue.flush_interval {
prev_data_size = data_size;
reschedule();
continue;
}
}
queue.back_buffer.swap(&queue.front_buffer);
break;
}
// write front buffer contents to stream + clear front buffer
let mut buffer = queue.front_buffer.borrow_mut();
stream.write_all(buffer.get_ref())?;
buffer.set_position(0);
buffer.get_mut().clear();
Ok(())
}
#[allow(clippy::redundant_allocation, clippy::boxed_local)]
fn send_worker(conn: Box<Weak<ConnInner>>) -> i32 {
set_cancellable(true);
let weak_conn = *conn;
loop {
if is_cancelled() {
return 0;
}
let conn = unwrap_or!(weak_conn.upgrade(), return 0);
match conn.state.get() {
ConnState::Active => {
let mut writer = conn.stream.borrow().as_ref().unwrap().acquire_writer();
if let Err(e) = flush_queue_to_stream(&conn.send_queue, &mut writer) {
if is_cancelled() {
return 0;
}
conn.handle_error(e.into()).unwrap();
}
}
ConnState::Closed => return 0,
_ => {
conn.wait_state_changed(None);
}
}
}
}
#[allow(clippy::redundant_allocation, clippy::boxed_local)]
fn recv_worker(conn: Box<Weak<ConnInner>>) -> i32 {
set_cancellable(true);
let weak_conn = *conn;
loop {
if is_cancelled() {
return 0;
}
let conn = unwrap_or!(weak_conn.upgrade(), return 0);
match conn.state.get() {
ConnState::Active => {
let result = {
let mut reader = conn.stream.borrow().as_ref().unwrap().acquire_reader();
conn.recv_queue.pull(&mut reader)
};
match result {
Err(e) => {
if is_cancelled() {
return 0;
}
conn.handle_error(e).unwrap();
}
Ok(is_data_pulled) => {
if !is_data_pulled && conn.is_connected() {
conn.disconnect();
}
}
}
}
ConnState::Closed => return 0,
_ => {
conn.wait_state_changed(None);
}
}
}
}
//! Tarantool based client.
//! Can be used only from inside tarantool.
mod index;
mod inner;
mod promise;
mod recv_queue;
mod space;
mod stream;
use std::net::ToSocketAddrs;
use std::rc::Rc;
use std::time::Duration;
use self::promise::Promise;
use super::protocol::codec;
use super::protocol::options::{ConnOptions, Options};
use crate::error::Error;
use crate::tuple::{Decode, ToTupleBuffer, Tuple};
use inner::ConnInner;
/// Provides triggers for connect, disconnect and schema reload events.
pub trait ConnTriggers {
/// Defines a trigger for execution when a new connection is established, and authentication and schema fetch are
/// completed due to an event such as `connect`.
///
/// If the trigger execution fails and an exception happens, the connection’s state changes to `error`. In this
/// case, the connection is terminated.
fn on_connect(&self, conn: &Conn) -> Result<(), Error>;
/// Define a trigger for execution after a connection is closed.
fn on_disconnect(&self);
}
/// Connection to remote Tarantool server
pub struct Conn {
inner: Rc<ConnInner>,
is_master: bool,
}
impl Conn {
/// Create a new connection.
///
/// The connection is established on demand, at the time of the first request. It can be re-established
/// automatically after a disconnect (see [reconnect_after](struct.ConnOptions.html#structfield.reconnect_after) option).
/// The returned conn object supports methods for making remote requests, such as select, update or delete.
///
/// See also: [ConnOptions](struct.ConnOptions.html)
pub fn new(
addr: impl ToSocketAddrs,
options: ConnOptions,
triggers: Option<Rc<dyn ConnTriggers>>,
) -> Result<Self, Error> {
Ok(Conn {
inner: ConnInner::new(addr.to_socket_addrs()?.collect(), options, triggers),
is_master: true,
})
}
fn downgrade(inner: Rc<ConnInner>) -> Self {
Conn {
inner,
is_master: false,
}
}
/// Wait for connection to be active or closed.
///
/// Returns:
/// - `Ok(true)`: if active
/// - `Ok(true)`: if closed
/// - `Err(...TimedOut...)`: on timeout
pub fn wait_connected(&self, timeout: Option<Duration>) -> Result<bool, Error> {
self.inner.wait_connected(timeout)
}
/// Show whether connection is active or closed.
pub fn is_connected(&self) -> bool {
self.inner.is_connected()
}
/// Close a connection.
pub fn close(&self) {
self.inner.close()
}
/// Execute a PING command.
///
/// - `options` – the supported option is `timeout`
pub fn ping(&self, options: &Options) -> Result<(), Error> {
self.inner
.request(codec::encode_ping, |_, _| Ok(()), options)?;
Ok(())
}
/// Call a remote stored procedure.
///
/// `conn.call("func", &("1", "2", "3"))` is the remote-call equivalent of `func('1', '2', '3')`.
/// That is, `conn.call` is a remote stored-procedure call.
/// The return from `conn.call` is whatever the function returns.
pub fn call<T>(
&self,
function_name: &str,
args: &T,
options: &Options,
) -> Result<Option<Tuple>, Error>
where
T: ToTupleBuffer,
T: ?Sized,
{
self.inner.request(
|buf, sync| codec::encode_call(buf, sync, function_name, args),
codec::decode_call,
options,
)
}
/// Call a remote stored procedure without yielding.
///
/// If enqueuing a request succeeded a [`Promise`] is returned which will be
/// kept once a response is received.
pub fn call_async<A, R>(&self, func: &str, args: A) -> crate::Result<Promise<R>>
where
A: ToTupleBuffer,
R: for<'de> Decode<'de> + 'static,
{
self.inner.request_async(codec::Call(func, args))
}
/// Evaluates and executes the expression in Lua-string, which may be any statement or series of statements.
///
/// An execute privilege is required; if the user does not have it, an administrator may grant it with
/// `box.schema.user.grant(username, 'execute', 'universe')`.
///
/// To ensure that the return from `eval` is whatever the Lua expression returns, begin the Lua-string with the
/// word `return`.
pub fn eval<T>(
&self,
expression: &str,
args: &T,
options: &Options,
) -> Result<Option<Tuple>, Error>
where
T: ToTupleBuffer,
T: ?Sized,
{
self.inner.request(
|buf, sync| codec::encode_eval(buf, sync, expression, args),
codec::decode_call,
options,
)
}
/// Executes a series of lua statements on a remote host without yielding.
///
/// If enqueuing a request succeeded a [`Promise`] is returned which will be
/// kept once a response is received.
pub fn eval_async<A, R>(&self, expr: &str, args: A) -> crate::Result<Promise<R>>
where
A: ToTupleBuffer,
R: for<'de> Decode<'de> + 'static,
{
self.inner.request_async(codec::Eval(expr, args))
}
/// Remote execute of sql query.
pub fn execute(
&self,
sql: &str,
bind_params: &impl ToTupleBuffer,
options: &Options,
) -> Result<Vec<Tuple>, Error> {
self.inner.request(
|buf, sync| codec::encode_execute(buf, sync, sql, bind_params),
|buf, _| codec::decode_multiple_rows(buf, None),
options,
)
}
}
impl Drop for Conn {
fn drop(&mut self) {
if self.is_master {
self.close();
}
}
}
use std::cell::{Cell, UnsafeCell};
use std::io;
use std::rc::{Rc, Weak};
use std::time::{Duration, Instant};
use crate::clock::INFINITY;
use crate::error::Error;
use crate::fiber::Cond;
use crate::network::protocol::codec::Consumer;
use crate::tuple::Decode;
use crate::Result;
use super::inner::ConnInner;
type StdResult<T, E> = std::result::Result<T, E>;
/// An asynchronous [`net_box::Conn`](crate::net_box::Conn) response.
pub struct Promise<T> {
inner: Rc<InnerPromise<T>>,
}
impl<T> Promise<T> {
#[inline]
pub(crate) fn new(conn: Weak<ConnInner>) -> Self {
Self {
inner: Rc::new(InnerPromise {
conn,
cond: UnsafeCell::default(),
data: Cell::new(None),
}),
}
}
#[inline]
pub(crate) fn downgrade(&self) -> Weak<InnerPromise<T>> {
Rc::downgrade(&self.inner)
}
#[inline]
fn is_connected(&self) -> bool {
self.inner
.conn
.upgrade()
.map(|c| c.is_connected())
.unwrap_or(false)
}
#[inline]
fn check_connection(&self) -> Result<()> {
if self.is_connected() {
Ok(())
} else {
Err(io::Error::from(io::ErrorKind::NotConnected).into())
}
}
/// Check if the promise is kept. Returns an error if one was received or if
/// connection is closed.
#[inline]
pub fn state(&self) -> State {
if let Some(res) = self.inner.data.take() {
let is_ok = res.is_ok();
self.inner.data.set(Some(res));
if is_ok {
State::Kept
} else {
State::ReceivedError
}
} else if self.is_connected() {
State::Pending
} else {
State::Disconnected
}
}
/// Check if the promise is kept and return the value. Consumes `self`.
/// If you only need to check the state of the promise, use the
/// [`state`](`Self::state`) method.
///
/// Does not yield.
///
/// Returns:
/// - [`Ok`]`(value)` if value is available.
/// - [`Err`]`(error)` if
/// - received a response with error
/// - connection was closed
/// - [`Pending`]`(self)` otherwise
///
/// [`Ok`]: TryGet::Ok
/// [`Err`]: TryGet::Err
/// [`Pending`]: TryGet::Pending
#[inline]
pub fn try_get(self) -> TryGet<T, Error> {
match (self.inner.data.take(), self.check_connection()) {
(Some(Ok(v)), _) => TryGet::Ok(v),
(Some(Err(e)), _) | (None, Err(e)) => TryGet::Err(e),
(None, Ok(())) => TryGet::Pending(self),
}
}
/// Waits indefinitely until the promise is kept or the connection is
/// closed. Consumes `self`.
#[inline]
pub fn wait(self) -> Result<T> {
match self.wait_timeout(INFINITY) {
TryGet::Ok(v) => Ok(v),
TryGet::Err(e) => Err(e),
TryGet::Pending(_) => unreachable!("100 years have passed, wake up"),
}
}
/// Waits for the promise to be kept. Consumes `self`.
///
/// Assume this yields.
///
/// Returns:
/// - [`Ok`]`(value)` if promise was successfully kept within time limit.
/// - [`Err`]`(error)`
/// - received a response with error
/// - connection was closed
/// - [`Pending`](self) on timeout
///
/// [`Ok`]: TryGet::Ok
/// [`Err`]: TryGet::Err
/// [`Pending`]: TryGet::Pending
pub fn wait_timeout(self, mut timeout: Duration) -> TryGet<T, Error> {
if let Some(res) = self.inner.data.take() {
return res.into();
}
loop {
if let Err(e) = self.check_connection() {
break TryGet::Err(e);
}
let last_awake = Instant::now();
unsafe { &*self.inner.cond.get() }.wait_timeout(timeout);
if let Some(res) = self.inner.data.take() {
break res.into();
}
timeout = timeout.saturating_sub(last_awake.elapsed());
if timeout.is_zero() {
break TryGet::Pending(self);
}
}
}
/// Replaces the contained `Cond` used for [`wait`] & [`wait_timeout`]
/// methods with the provided one. Useful if several promises need to be
/// waited on.
///
/// # Example
/// ```no_run
/// use tarantool::{fiber::Cond, net_box::{Conn, promise::{Promise, State}}};
/// use std::rc::Rc;
///
/// # fn get_conn(addr: &str) -> Conn { todo!() }
/// let c1: Conn = get_conn("addr1");
/// let mut p1: Promise<()> = c1.call_async("foo", ()).unwrap();
/// let c2: Conn = get_conn("addr2");
/// let mut p2: Promise<()> = c2.call_async("foo", ()).unwrap();
/// let cond = Rc::new(Cond::new());
/// p1.replace_cond(cond.clone());
/// p2.replace_cond(cond.clone());
/// cond.wait();
/// assert!(
/// matches!(p1.state(), State::Kept | State::ReceivedError) ||
/// matches!(p2.state(), State::Kept | State::ReceivedError)
/// )
/// ```
///
/// [`wait`]: Self::wait
/// [`wait_timeout`]: Self::wait_timeout
pub fn replace_cond(&mut self, cond: Rc<Cond>) -> Rc<Cond> {
unsafe { std::ptr::replace(self.inner.cond.get(), cond) }
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum State {
Kept,
ReceivedError,
Pending,
Disconnected,
}
/// Represents all possible value that can be returned from [`Promise::try_get`]
/// or [`Promise::wait_timeout`] methods.
#[derive(Debug)]
pub enum TryGet<T, E> {
/// Promise was kept successfully.
Ok(T),
/// Promise will never be kept due to an error.
Err(E),
/// Promise yet is unresolved.
Pending(Promise<T>),
}
impl<T, E> TryGet<T, E> {
pub fn ok(self) -> Option<T> {
match self {
Self::Ok(v) => Some(v),
_ => None,
}
}
pub fn err(self) -> Option<E> {
match self {
Self::Err(e) => Some(e),
_ => None,
}
}
pub fn pending(self) -> Option<Promise<T>> {
match self {
Self::Pending(p) => Some(p),
_ => None,
}
}
/// Converts `self` into a nested [`Result`].
///
/// Returns
/// - `Ok(Ok(value))` in case of [`TryGet::Ok`]`(value)`.
/// - `Ok(Err(error))` in case of [`TryGet::Err`]`(error)`.
/// - `Err(promise)` in case of [`TryGet::Pending`]`(promise)`.
///
/// This function basically checks if the promise is resolved (`Ok`) or not
/// yet (`Err`).
///
/// [`Result`]: std::result::Result
#[inline(always)]
pub fn into_res(self) -> StdResult<StdResult<T, E>, Promise<T>> {
match self {
Self::Ok(v) => Ok(Ok(v)),
Self::Err(e) => Ok(Err(e)),
Self::Pending(p) => Err(p),
}
}
}
impl<T, E> From<StdResult<T, E>> for TryGet<T, E> {
fn from(r: StdResult<T, E>) -> Self {
match r {
Ok(v) => Self::Ok(v),
Err(e) => Self::Err(e),
}
}
}
impl<T, E> From<TryGet<T, E>> for StdResult<StdResult<T, E>, Promise<T>> {
#[inline(always)]
fn from(r: TryGet<T, E>) -> Self {
r.into_res()
}
}
impl<T> std::fmt::Debug for Promise<T> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("Promise")
.field("state", &self.state())
.finish_non_exhaustive()
}
}
pub struct InnerPromise<T> {
conn: Weak<ConnInner>,
cond: UnsafeCell<Rc<Cond>>,
data: Cell<Option<Result<T>>>,
}
impl<T> InnerPromise<T> {
fn signal(&self) {
unsafe { &*self.cond.get() }.signal();
}
}
impl<T> Consumer for InnerPromise<T>
where
T: for<'de> Decode<'de>,
{
fn handle_error(&self, error: Error) {
self.data.set(Some(Err(error)));
self.signal();
}
fn handle_disconnect(&self) {
self.signal();
}
fn consume_data(&self, data: &[u8]) {
self.data.set(Some(T::decode(data)));
self.signal();
}
}
use std::cell::{Cell, RefCell, UnsafeCell};
use std::collections::{hash_map::Iter as HashMapIter, HashMap};
use std::io::{self, Cursor, Read};
use std::ops::Range;
use std::rc::{Rc, Weak};
use refpool::{Pool, PoolRef};
use rmp::decode;
use crate::error::Error;
use crate::fiber::{Cond, Latch};
use crate::network::protocol::codec::{
decode_error, decode_header, Consumer, Header, Response, Sync,
};
use crate::network::protocol::options::Options;
type Consumers = HashMap<Sync, Weak<dyn Consumer>>;
pub struct RecvQueue {
is_active: Cell<bool>,
buffer: RefCell<Cursor<Vec<u8>>>,
chunks: RefCell<Vec<Range<usize>>>,
cond_map: RefCell<HashMap<Sync, PoolRef<Cond>>>,
cond_pool: Pool<Cond>,
async_consumers: UnsafeCell<Consumers>,
read_offset: Cell<usize>,
read_completed_cond: Cond,
header_recv_result: RefCell<Option<Result<Header, Error>>>,
notification_lock: Latch,
}
impl RecvQueue {
pub fn new(buffer_size: usize) -> Self {
let buffer = vec![0; buffer_size];
RecvQueue {
is_active: Cell::new(true),
buffer: RefCell::new(Cursor::new(buffer)),
chunks: RefCell::new(Vec::with_capacity(1024)),
cond_map: RefCell::new(HashMap::new()),
cond_pool: Pool::new(1024),
async_consumers: UnsafeCell::new(HashMap::new()),
read_offset: Cell::new(0),
read_completed_cond: Cond::new(),
header_recv_result: RefCell::new(None),
notification_lock: Latch::new(),
}
}
pub fn recv<F, R>(
&self,
sync: u64,
payload_consumer: F,
options: &Options,
) -> Result<Response<R>, Error>
where
F: FnOnce(&mut Cursor<Vec<u8>>, &Header) -> Result<R, Error>,
{
if !self.is_active.get() {
return Err(io::Error::from(io::ErrorKind::ConnectionAborted).into());
}
let cond_ref = PoolRef::new(&self.cond_pool, Cond::new());
{
self.cond_map.borrow_mut().insert(sync, cond_ref.clone());
}
let is_signaled = match options.timeout {
None => cond_ref.wait(),
Some(timeout) => cond_ref.wait_timeout(timeout),
};
if is_signaled {
let result = {
let header = self.header_recv_result.replace(None).unwrap();
match header {
Ok(header) => {
if header.status_code != 0 {
return Err(decode_error(self.buffer.borrow_mut().by_ref())?.into());
}
payload_consumer(self.buffer.borrow_mut().by_ref(), &header)
.map(|payload| Response { payload, header })
}
Err(e) => return Err(e),
}
};
self.read_completed_cond.signal();
result
} else {
self.cond_map.borrow_mut().remove(&sync);
Err(io::Error::from(io::ErrorKind::TimedOut).into())
}
}
pub fn add_consumer(&self, sync: Sync, consumer: Weak<dyn Consumer>) {
unsafe { (*self.async_consumers.get()).insert(sync, consumer) };
}
pub fn get_consumer(&self, sync: Sync) -> Option<Rc<dyn Consumer>> {
unsafe { &mut *self.async_consumers.get() }
.remove(&sync)
.and_then(|c| c.upgrade())
}
pub fn iter_consumers(&self) -> HashMapIter<Sync, Weak<dyn Consumer>> {
unsafe { &*self.async_consumers.get() }.iter()
}
pub fn pull(&self, stream: &mut impl Read) -> Result<bool, Error> {
if !self.is_active.get() {
return Ok(false);
}
let mut chunks = self.chunks.borrow_mut();
let mut overflow_range = 0..0;
{
let mut buffer = self.buffer.borrow_mut();
let data_len = stream.read(&mut buffer.get_mut()[self.read_offset.get()..])?;
if data_len == 0 {
return Ok(false);
}
chunks.clear();
buffer.set_position(0);
loop {
let prefix_chunk_offset = buffer.position();
let chunk_len = decode::read_u32(&mut *buffer)? as usize;
let chunk_offset = buffer.position() as _;
let new_offset = chunk_offset + chunk_len;
if new_offset > data_len {
overflow_range = (prefix_chunk_offset as usize)..(data_len as usize);
break;
}
chunks.push(chunk_offset..new_offset);
if new_offset == data_len {
break;
}
buffer.set_position(new_offset as u64);
}
};
{
let _lock = self.notification_lock.lock();
for &Range { start, end } in chunks.iter() {
let header = {
let mut buffer = self.buffer.borrow_mut();
buffer.set_position(start as _);
decode_header(buffer.by_ref())?
};
let sync = header.sync;
let cond_ref = self.cond_map.borrow_mut().remove(&sync);
if let Some(cond_ref) = cond_ref {
self.header_recv_result.replace(Some(Ok(header)));
cond_ref.signal();
self.read_completed_cond.wait();
} else if let Some(consumer) = self.get_consumer(sync) {
let buffer = self.buffer.borrow();
let body_start = buffer.position() as usize;
consumer.consume(&header, &buffer.get_ref()[body_start..end]);
}
}
}
let new_read_offset = if !overflow_range.is_empty() {
let new_read_offset = overflow_range.end - overflow_range.start;
self.buffer
.borrow_mut()
.get_mut()
.copy_within(overflow_range, 0);
new_read_offset as usize
} else {
0
};
self.read_offset.set(new_read_offset);
Ok(true)
}
pub fn close(&self) {
let _lock = self.notification_lock.lock();
self.is_active.set(false);
for (_, cond_ref) in self.cond_map.borrow_mut().drain() {
self.header_recv_result
.replace(Some(Err(
io::Error::from(io::ErrorKind::ConnectionAborted).into()
)));
cond_ref.signal();
}
for consumer in self.iter_consumers().filter_map(|(_, c)| c.upgrade()) {
consumer.handle_disconnect();
}
}
}
use std::rc::Rc;
use crate::error::Error;
use crate::index::IteratorType;
use crate::network::protocol::options::Options;
use crate::tuple::{Encode, ToTupleBuffer, Tuple};
use super::index::{RemoteIndex, RemoteIndexIterator};
use super::inner::ConnInner;
use crate::network::protocol::codec;
/// Remote space
pub struct RemoteSpace {
conn_inner: Rc<ConnInner>,
space_id: u32,
}
impl RemoteSpace {
pub(crate) fn new(conn_inner: Rc<ConnInner>, space_id: u32) -> Self {
RemoteSpace {
conn_inner,
space_id,
}
}
/// Returns index with id = 0
#[inline(always)]
pub fn primary_key(&self) -> RemoteIndex {
RemoteIndex::new(self.conn_inner.clone(), self.space_id, 0)
}
/// The remote-call equivalent of the local call `Space::get(...)`
/// (see [details](../space/struct.Space.html#method.get)).
pub fn get<K>(&self, key: &K, options: &Options) -> Result<Option<Tuple>, Error>
where
K: ToTupleBuffer,
{
self.primary_key().get(key, options)
}
/// The remote-call equivalent of the local call `Space::select(...)`
/// (see [details](../space/struct.Space.html#method.select)).
pub fn select<K>(
&self,
iterator_type: IteratorType,
key: &K,
options: &Options,
) -> Result<RemoteIndexIterator, Error>
where
K: ToTupleBuffer,
{
self.primary_key().select(iterator_type, key, options)
}
/// The remote-call equivalent of the local call `Space::insert(...)`
/// (see [details](../space/struct.Space.html#method.insert)).
pub fn insert<T>(&self, value: &T, options: &Options) -> Result<Option<Tuple>, Error>
where
T: ToTupleBuffer,
{
self.conn_inner.request(
|buf, sync| codec::encode_insert(buf, sync, self.space_id, value),
codec::decode_single_row,
options,
)
}
/// The remote-call equivalent of the local call `Space::replace(...)`
/// (see [details](../space/struct.Space.html#method.replace)).
pub fn replace<T>(&self, value: &T, options: &Options) -> Result<Option<Tuple>, Error>
where
T: ToTupleBuffer,
{
self.conn_inner.request(
|buf, sync| codec::encode_replace(buf, sync, self.space_id, value),
codec::decode_single_row,
options,
)
}
/// The remote-call equivalent of the local call `Space::update(...)`
/// (see [details](../space/struct.Space.html#method.update)).
pub fn update<K, Op>(
&self,
key: &K,
ops: &[Op],
options: &Options,
) -> Result<Option<Tuple>, Error>
where
K: ToTupleBuffer,
Op: Encode,
{
self.primary_key().update(key, ops, options)
}
/// The remote-call equivalent of the local call `Space::upsert(...)`
/// (see [details](../space/struct.Space.html#method.upsert)).
pub fn upsert<T, Op>(
&self,
value: &T,
ops: &[Op],
options: &Options,
) -> Result<Option<Tuple>, Error>
where
T: ToTupleBuffer,
Op: Encode,
{
self.primary_key().upsert(value, ops, options)
}
/// The remote-call equivalent of the local call `Space::delete(...)`
/// (see [details](../space/struct.Space.html#method.delete)).
pub fn delete<K>(&self, key: &K, options: &Options) -> Result<Option<Tuple>, Error>
where
K: ToTupleBuffer,
{
self.primary_key().delete(key, options)
}
}
use std::cell::Cell;
use std::io::{self, Read, Write};
use std::os::unix::io::{IntoRawFd, RawFd};
use std::rc::Rc;
use crate::coio::{read, write, CoIOStream};
use crate::error::Error;
use crate::ffi::tarantool as ffi;
use crate::fiber::Cond;
pub struct ConnStream {
fd: RawFd,
reader_guard: Rc<ConnStreamGuard>,
writer_guard: Rc<ConnStreamGuard>,
}
impl ConnStream {
pub fn new(stream: CoIOStream) -> Result<Self, Error> {
Ok(ConnStream {
fd: stream.into_raw_fd(),
reader_guard: Rc::new(ConnStreamGuard {
is_acquired: Cell::new(false),
drop_cond: Cond::new(),
}),
writer_guard: Rc::new(ConnStreamGuard {
is_acquired: Cell::new(false),
drop_cond: Cond::new(),
}),
})
}
pub fn is_reader_acquired(&self) -> bool {
self.reader_guard.is_acquired.get()
}
pub fn acquire_reader(&self) -> ConnStreamReader {
self.reader_guard.wait();
self.reader_guard.is_acquired.set(true);
ConnStreamReader {
fd: self.fd,
reader_guard: self.reader_guard.clone(),
}
}
pub fn acquire_writer(&self) -> ConnStreamWriter {
self.writer_guard.wait();
self.writer_guard.is_acquired.set(true);
ConnStreamWriter {
fd: self.fd,
writer_guard: self.writer_guard.clone(),
}
}
}
struct ConnStreamGuard {
is_acquired: Cell<bool>,
drop_cond: Cond,
}
impl ConnStreamGuard {
fn wait(&self) {
if self.is_acquired.get() {
self.drop_cond.wait();
}
}
}
impl Drop for ConnStream {
fn drop(&mut self) {
self.reader_guard.wait();
self.writer_guard.wait();
unsafe { ffi::coio_close(self.fd) };
}
}
pub struct ConnStreamReader {
fd: RawFd,
reader_guard: Rc<ConnStreamGuard>,
}
impl Read for ConnStreamReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
read(self.fd, buf, None)
}
}
impl Drop for ConnStreamReader {
fn drop(&mut self) {
self.reader_guard.is_acquired.set(false);
self.reader_guard.drop_cond.signal();
}
}
pub struct ConnStreamWriter {
fd: RawFd,
writer_guard: Rc<ConnStreamGuard>,
}
impl Write for ConnStreamWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
write(self.fd, buf, None)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl Drop for ConnStreamWriter {
fn drop(&mut self) {
self.writer_guard.is_acquired.set(false);
self.writer_guard.drop_cond.signal();
}
}
#[cfg(feature = "network_client")]
pub mod client;
pub mod protocol;