Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • core/tarantool-module
1 result
Show changes
Commits on Source (4)
......@@ -18,6 +18,9 @@
\- descriptions for stored procedures defined with `#[tarantool::proc]`.
- `proc::module_path` helper function for getting a path to the dynamically
linked object file in which the given symbol is defined.
- `msgpack::ArrayWriter` helper struct for generating msgpack arrays from
arbitrary serializable data.
- `msgpack::ValueIter` helper struct for iterating over msgpack values.
- `tarantool::network::client` alternative async network client.
- `tarantool::network::client::reconnect` reconnecting async network client based on `network::client`.
- `tarantool::network::protocol` sans-io (without transport layer) implementation of Tarantool Binary Protocol.
......
......@@ -16,6 +16,20 @@ pub fn impl_macro_attribute(attr: TS1, item: TS1) -> TS1 {
linkme,
should_panic,
} = ctx;
let fn_item = if fn_item.sig.asyncness.is_some() {
let body = fn_item.block;
quote! {
fn #fn_name() {
#tarantool::fiber::block_on(async { #body })
}
}
} else {
quote! {
#fn_item
}
};
quote! {
#[#linkme::distributed_slice(#section)]
#[linkme(crate = #linkme)]
......
......@@ -261,24 +261,20 @@ mod tests {
use super::*;
#[crate::test(tarantool = "crate")]
fn smoke() {
fiber::block_on(async {
let m = Mutex::new(());
drop(m.lock().await);
drop(m.lock().await);
})
async fn smoke() {
let m = Mutex::new(());
drop(m.lock().await);
drop(m.lock().await);
}
#[crate::test(tarantool = "crate")]
fn timeouts() {
fiber::block_on(async {
let m = Mutex::new(());
let _guard = m.lock().await;
let _guard_2 = async { ok(m.lock().await) }
.timeout(Duration::from_millis(50))
.await
.unwrap_err();
})
async fn timeouts() {
let m = Mutex::new(());
let _guard = m.lock().await;
let _guard_2 = async { ok(m.lock().await) }
.timeout(Duration::from_millis(50))
.await
.unwrap_err();
}
#[crate::test(tarantool = "crate")]
......
......@@ -424,6 +424,28 @@ pub use linkme;
/// assert!(false);
/// }
/// ```
///
/// You can also use `#[tarantool::test]` with `async` functions, in which case
/// the body of the test will be wrapped inside `fiber::block_on(async {})`
/// block. The following two tests are equivalent:
/// ```no_run
/// #[tarantool::test]
/// async fn async_test_1() {
/// assert_eq!(foo().await, 1);
/// }
///
/// #[tarantool::test]
/// fn async_test_2() {
/// tarantool::fiber::block_on(async {
/// assert_eq!(foo().await, 1);
/// })
/// }
///
/// async fn foo() -> i32 {
/// 1
/// }
/// ```
///
pub use tarantool_proc::test;
/// Return a global tarantool lua state.
......
use std::io::Cursor;
use std::io::{Read, Seek, SeekFrom};
use byteorder::{BigEndian, ReadBytesExt};
use super::tuple::ToTupleBuffer;
use super::tuple::{Decode, RawBytes, ToTupleBuffer};
use crate::Result;
pub fn skip_value(cur: &mut (impl Read + Seek)) -> Result<()> {
......@@ -126,3 +127,274 @@ pub fn write_array_len(
rmp::encode::write_array_len(w, len)?;
Ok(())
}
////////////////////////////////////////////////////////////////////////////////
// ArrayWriter
////////////////////////////////////////////////////////////////////////////////
/// A helper struct for serializing msgpack arrays from arbitrary serializable
/// types.
///
/// Call [`ArrayWriter::finish`] to finilize the msgpack array and get the
/// underlying `writer` struct.
///
/// # Example
/// ```
/// use tarantool::msgpack::ArrayWriter;
/// let mut array_writer = ArrayWriter::from_vec(vec![]);
/// array_writer.push(&1).unwrap();
/// array_writer.push(&("foo", "bar")).unwrap();
/// array_writer.push(&true).unwrap();
/// let cursor = array_writer.finish().unwrap();
/// let data = cursor.into_inner();
/// assert_eq!(data, b"\xdd\x00\x00\x00\x03\x01\x92\xa3foo\xa3bar\xc3");
/// ```
#[derive(Debug)]
pub struct ArrayWriter<W> {
/// The underlying writer, into which the data is written.
writer: W,
/// Stream position of `writer` when `self` was created.
start: u64,
/// Current length of the msgpack array.
///
/// NOTE: Msgpack max array size is 2³² - 1.
len: u32,
}
impl ArrayWriter<Cursor<Vec<u8>>> {
/// Create an `ArrayWriter` using a `Vec<u8>` as the underlying buffer.
#[track_caller]
#[inline(always)]
pub fn from_vec(buf: Vec<u8>) -> Self {
Self::new(Cursor::new(buf)).expect("allocation error")
}
}
impl<W> ArrayWriter<W>
where
W: std::io::Write + std::io::Seek,
{
const MAX_ARRAY_HEADER_SIZE: i64 = 5;
#[inline(always)]
pub fn new(mut writer: W) -> Result<Self> {
// Leave space for array size
let start = writer.stream_position()?;
writer.seek(SeekFrom::Current(Self::MAX_ARRAY_HEADER_SIZE))?;
Ok(Self {
start,
writer,
len: 0,
})
}
/// Stream position of `writer` when `self` was created.
#[inline(always)]
pub fn start(&self) -> u64 {
self.start
}
/// Current length of the msgpack array.
///
/// NOTE: Msgpack max array size is 2³² - 1.
#[inline(always)]
pub fn len(&self) -> u32 {
self.len
}
#[inline(always)]
pub fn is_empty(&self) -> bool {
self.len == 0
}
#[inline(always)]
pub fn into_inner(self) -> W {
self.writer
}
/// Push a type that can be serialized as a msgpack value.
#[inline(always)]
pub fn push<T>(&mut self, v: &T) -> Result<()>
where
T: ::serde::Serialize + ?Sized,
{
rmp_serde::encode::write(&mut self.writer, &v)?;
self.len += 1;
Ok(())
}
/// Push a type representable as a tarantool tuple.
#[inline(always)]
pub fn push_tuple<T>(&mut self, v: &T) -> Result<()>
where
T: ToTupleBuffer + ?Sized,
{
v.write_tuple_data(&mut self.writer)?;
self.len += 1;
Ok(())
}
/// Push arbitrary bytes as a msgpack array element.
///
/// NOTE: The user must make sure to push a valid msgpack value.
#[inline(always)]
pub fn push_raw(&mut self, v: &[u8]) -> Result<()> {
self.writer.write_all(v)?;
self.len += 1;
Ok(())
}
/// Finilize the msgpack array and return the underlying writer.
pub fn finish(mut self) -> Result<W> {
use rmp::encode::RmpWrite;
self.writer.seek(SeekFrom::Start(self.start))?;
self.writer.write_u8(rmp::Marker::Array32.to_u8())?;
self.writer
.write_data_u32(self.len)
.map_err(rmp::encode::ValueWriteError::from)?;
Ok(self.writer)
}
}
////////////////////////////////////////////////////////////////////////////////
// ArrayIter
////////////////////////////////////////////////////////////////////////////////
/// A helper struct for iterating over msgpack values.
///
/// # Example
/// ```
/// use tarantool::msgpack::ValueIter;
/// let mut value_iter = ValueIter::from_array(b"\x93*\xc0\xa3yes").unwrap();
/// // You can decode the next value
/// assert_eq!(value_iter.decode_next::<i64>().map(Result::ok).flatten(), Some(42));
/// // Or just get the raw slice of bytes
/// assert_eq!(value_iter.next(), Some(&b"\xc0"[..]));
/// assert_eq!(value_iter.decode_next::<String>().map(Result::ok).flatten(), Some("yes".to_owned()));
///
/// // Returns None when there's no more values
/// assert_eq!(value_iter.decode_next::<String>().map(Result::ok), None);
/// assert_eq!(value_iter.next(), None);
/// ```
#[derive(Debug)]
pub struct ValueIter<'a> {
cursor: Cursor<&'a [u8]>,
}
impl<'a> ValueIter<'a> {
/// Return an iterator over elements of msgpack `array`, or error in case
/// `array` doesn't start with a valid msgpack array marker.
pub fn from_array(array: &'a [u8]) -> std::result::Result<Self, rmp::decode::ValueReadError> {
let mut cursor = Cursor::new(array);
// Don't care about length, will just exhaust all the values in the slice
rmp::decode::read_array_len(&mut cursor)?;
Ok(Self { cursor })
}
/// Return an iterator over msgpack values packed one after another in `data`.
pub fn new(data: &'a [u8]) -> Self {
Self {
cursor: Cursor::new(data),
}
}
/// Return an iterator over msgpack values packed one after another in `data`.
pub fn decode_next<T>(&mut self) -> Option<Result<T>>
where
T: Decode<'a>,
{
if self.cursor.position() as usize >= self.cursor.get_ref().len() {
return None;
}
let start = self.cursor.position() as usize;
if let Err(e) = skip_value(&mut self.cursor) {
return Some(Err(e));
}
let end = self.cursor.position() as usize;
debug_assert_ne!(start, end, "skip_value should've returned Err in this case");
let data = &self.cursor.get_ref()[start..end];
Some(T::decode(data))
}
pub fn into_inner(self) -> Cursor<&'a [u8]> {
self.cursor
}
}
impl<'a> Iterator for ValueIter<'a> {
type Item = &'a [u8];
#[inline(always)]
fn next(&mut self) -> Option<&'a [u8]> {
self.decode_next::<&RawBytes>()?.ok().map(|b| &**b)
}
}
////////////////////////////////////////////////////////////////////////////////
// test
////////////////////////////////////////////////////////////////////////////////
#[cfg(test)]
mod test {
use super::*;
#[test]
fn array_writer() {
let mut aw = ArrayWriter::from_vec(Vec::new());
aw.push_tuple(&(420, "foo")).unwrap();
aw.push(&"bar").unwrap();
aw.push_raw(b"\xa3baz").unwrap();
let data = aw.finish().unwrap().into_inner();
eprintln!("{:x?}", &data);
let res: ((u32, String), String, String) = rmp_serde::from_slice(&data).unwrap();
assert_eq!(
res,
((420, "foo".to_owned()), "bar".to_owned(), "baz".to_owned())
);
}
#[test]
fn value_iter() {
let mut iter = ValueIter::new(b"");
assert_eq!(iter.next(), None);
let mut iter = ValueIter::new(b"*");
assert_eq!(iter.next(), Some(&b"*"[..]));
assert_eq!(iter.next(), None);
let err = ValueIter::from_array(b"").unwrap_err();
assert_eq!(err.to_string(), "failed to read MessagePack marker");
let mut iter = ValueIter::from_array(b"\x99").unwrap();
assert_eq!(iter.next(), None);
let mut iter = ValueIter::from_array(b"\x99*").unwrap();
assert_eq!(iter.next(), Some(&b"*"[..]));
assert_eq!(iter.next(), None);
let data = b"\x93*\x93\xc0\xc2\xc3\xa3sup";
let mut iter = ValueIter::from_array(data).unwrap();
let v: u32 = iter.decode_next().unwrap().unwrap();
assert_eq!(v, 42);
let v: Vec<Option<bool>> = iter.decode_next().unwrap().unwrap();
assert_eq!(v, [None, Some(false), Some(true)]);
let v: String = iter.decode_next().unwrap().unwrap();
assert_eq!(v, "sup");
let mut iter = ValueIter::from_array(data).unwrap();
let v = iter.next().unwrap();
assert_eq!(v, b"*");
let v = iter.next().unwrap();
assert_eq!(v, b"\x93\xc0\xc2\xc3");
let v = iter.next().unwrap();
assert_eq!(v, b"\xa3sup");
let mut iter = ValueIter::new(data);
let v: (u32, Vec<Option<bool>>, String) =
rmp_serde::from_slice(iter.next().unwrap()).unwrap();
assert_eq!(v, (42, vec![None, Some(false), Some(true)], "sup".into()));
}
}
......@@ -443,32 +443,26 @@ mod tests {
}
#[crate::test(tarantool = "crate")]
fn connect() {
fiber::block_on(async {
let _client = Client::connect("localhost", TARANTOOL_LISTEN)
.await
.unwrap();
});
async fn connect() {
let _client = Client::connect("localhost", TARANTOOL_LISTEN)
.await
.unwrap();
}
#[crate::test(tarantool = "crate")]
fn connect_failure() {
fiber::block_on(async {
// Can be any other unused port
let err = Client::connect("localhost", 0).await.unwrap_err();
assert!(matches!(dbg!(err), Error::Tcp(_)))
});
async fn connect_failure() {
// Can be any other unused port
let err = Client::connect("localhost", 0).await.unwrap_err();
assert!(matches!(dbg!(err), Error::Tcp(_)))
}
#[crate::test(tarantool = "crate")]
fn ping() {
fiber::block_on(async {
let client = test_client().await;
async fn ping() {
let client = test_client().await;
for _ in 0..5 {
client.ping().timeout(Duration::from_secs(3)).await.unwrap();
}
});
for _ in 0..5 {
client.ping().timeout(Duration::from_secs(3)).await.unwrap();
}
}
#[crate::test(tarantool = "crate")]
......@@ -485,7 +479,7 @@ mod tests {
}
#[crate::test(tarantool = "crate")]
fn execute() {
async fn execute() {
Space::find("test_s1")
.unwrap()
.insert(&(6001, "6001"))
......@@ -495,112 +489,100 @@ mod tests {
.insert(&(6002, "6002"))
.unwrap();
fiber::block_on(async {
let client = test_client().await;
let result = client
.execute(r#"SELECT * FROM "test_s1""#, &(), None)
.timeout(Duration::from_secs(3))
.await
.unwrap();
assert!(result.len() >= 2);
let result = client
.execute(r#"SELECT * FROM "test_s1" WHERE "id" = ?"#, &(6002,), None)
.timeout(Duration::from_secs(3))
.await
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(
result.get(0).unwrap().decode::<(u64, String)>().unwrap(),
(6002, "6002".into())
);
});
let client = test_client().await;
let result = client
.execute(r#"SELECT * FROM "test_s1""#, &(), None)
.timeout(Duration::from_secs(3))
.await
.unwrap();
assert!(result.len() >= 2);
let result = client
.execute(r#"SELECT * FROM "test_s1" WHERE "id" = ?"#, &(6002,), None)
.timeout(Duration::from_secs(3))
.await
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(
result.get(0).unwrap().decode::<(u64, String)>().unwrap(),
(6002, "6002".into())
);
}
#[crate::test(tarantool = "crate")]
fn call() {
fiber::block_on(async {
let client = test_client().await;
let result = client
.call("test_stored_proc", &(1, 2))
.timeout(Duration::from_secs(3))
.await
.unwrap();
assert_eq!(result.unwrap().decode::<(i32,)>().unwrap(), (3,));
});
async fn call() {
let client = test_client().await;
let result = client
.call("test_stored_proc", &(1, 2))
.timeout(Duration::from_secs(3))
.await
.unwrap();
assert_eq!(result.unwrap().decode::<(i32,)>().unwrap(), (3,));
}
#[crate::test(tarantool = "crate")]
fn invalid_call() {
fiber::block_on(async {
let client = test_client().await;
let err = client
.call("unexistent_proc", &())
.timeout(Duration::from_secs(3))
.await
.unwrap_err()
.to_string();
assert_eq!(err, "protocol error: service responded with error: Procedure 'unexistent_proc' is not defined");
});
async fn invalid_call() {
let client = test_client().await;
let err = client
.call("unexistent_proc", &())
.timeout(Duration::from_secs(3))
.await
.unwrap_err()
.to_string();
assert_eq!(err, "protocol error: service responded with error: Procedure 'unexistent_proc' is not defined");
}
#[crate::test(tarantool = "crate")]
fn eval() {
fiber::block_on(async {
let client = test_client().await;
let result = client
.eval("return ...", &(1, 2))
.timeout(Duration::from_secs(3))
.await
.unwrap();
assert_eq!(result.unwrap().decode::<(i32, i32)>().unwrap(), (1, 2));
});
async fn eval() {
let client = test_client().await;
let result = client
.eval("return ...", &(1, 2))
.timeout(Duration::from_secs(3))
.await
.unwrap();
assert_eq!(result.unwrap().decode::<(i32, i32)>().unwrap(), (1, 2));
}
/// A regression test for https://git.picodata.io/picodata/picodata/tarantool-module/-/merge_requests/302
#[crate::test(tarantool = "crate")]
fn client_count_regression() {
fiber::block_on(async {
let client = test_client().await;
// Should close sender and receiver fibers
let close_token = client.0.borrow_mut().close_token.take();
close_token.unwrap().close().unwrap();
// Receiver wakes and closes
fiber::r#yield().unwrap();
client.0.borrow().sender_waker.send(()).unwrap();
// Sender wakes and closes
fiber::r#yield().unwrap();
// Sender and receiver stopped and dropped their refs
assert_eq!(Rc::strong_count(&client.0), 1);
// Cloning a client produces 2 refs
let client_clone = client.clone();
assert_eq!(Rc::strong_count(&client.0), 2);
// Here if client checked by Rc refs <= 3 it would assume it is the last and set state to ClosedManually
drop(client_clone);
assert_eq!(Rc::strong_count(&client.0), 1);
// This would panic on unreachable if previous drop have set the state
client.check_state().unwrap_err();
});
async fn client_count_regression() {
let client = test_client().await;
// Should close sender and receiver fibers
let close_token = client.0.borrow_mut().close_token.take();
close_token.unwrap().close().unwrap();
// Receiver wakes and closes
fiber::r#yield().unwrap();
client.0.borrow().sender_waker.send(()).unwrap();
// Sender wakes and closes
fiber::r#yield().unwrap();
// Sender and receiver stopped and dropped their refs
assert_eq!(Rc::strong_count(&client.0), 1);
// Cloning a client produces 2 refs
let client_clone = client.clone();
assert_eq!(Rc::strong_count(&client.0), 2);
// Here if client checked by Rc refs <= 3 it would assume it is the last and set state to ClosedManually
drop(client_clone);
assert_eq!(Rc::strong_count(&client.0), 1);
// This would panic on unreachable if previous drop have set the state
client.check_state().unwrap_err();
}
#[crate::test(tarantool = "crate")]
fn concurrent_messages_one_fiber() {
fiber::block_on(async {
let client = test_client().await;
let mut ping_futures = vec![];
for _ in 0..10 {
ping_futures.push(client.ping());
}
for res in futures::future::join_all(ping_futures).await {
res.unwrap();
}
});
async fn concurrent_messages_one_fiber() {
let client = test_client().await;
let mut ping_futures = vec![];
for _ in 0..10 {
ping_futures.push(client.ping());
}
for res in futures::future::join_all(ping_futures).await {
res.unwrap();
}
}
}
use std::{cell::RefCell, rc::Rc, time::Duration};
use std::rc::Rc;
use super::Error;
use crate::fiber::r#async::Mutex;
......@@ -23,7 +23,7 @@ pub struct Client {
// Testing related code
#[cfg(feature = "internal_test")]
inject_error: Rc<RefCell<Option<super::Error>>>,
inject_error: Rc<std::cell::RefCell<Option<super::Error>>>,
#[cfg(feature = "internal_test")]
reconnect_count: Rc<AtomicUsize>,
}
......@@ -147,6 +147,7 @@ mod tests {
use crate::fiber::r#async::timeout::IntoTimeout as _;
use crate::network::AsClient as _;
use crate::test::util::TARANTOOL_LISTEN;
use std::time::Duration;
const _3_SEC: Duration = Duration::from_secs(3);
......@@ -161,54 +162,48 @@ mod tests {
}
#[crate::test(tarantool = "crate")]
fn connect_failure() {
fiber::block_on(async {
// Can be any other unused port
let client = Client::new("localhost".into(), 0);
let err = client.ping().await.unwrap_err();
let correct_err = [
"tcp stream error: failed to connect to supplied address: Connection refused (os error 111)",
"tcp stream error: failed to connect to supplied address: Cannot assign requested address (os error 99)"
].contains(&err.to_string().as_str());
assert!(correct_err);
});
async fn connect_failure() {
// Can be any other unused port
let client = Client::new("localhost".into(), 0);
let err = client.ping().await.unwrap_err();
let correct_err = [
"tcp stream error: failed to connect to supplied address: Connection refused (os error 111)",
"tcp stream error: failed to connect to supplied address: Cannot assign requested address (os error 99)"
].contains(&err.to_string().as_str());
assert!(correct_err);
}
#[crate::test(tarantool = "crate")]
fn ping_after_reconnect() {
fiber::block_on(async {
let client = test_client();
async fn ping_after_reconnect() {
let client = test_client();
for _ in 0..2 {
client.ping().timeout(_3_SEC).await.unwrap();
}
assert_eq!(client.reconnect_count(), 0);
client.reconnect();
for _ in 0..2 {
client.ping().timeout(_3_SEC).await.unwrap();
}
assert_eq!(client.reconnect_count(), 1);
});
for _ in 0..2 {
client.ping().timeout(_3_SEC).await.unwrap();
}
assert_eq!(client.reconnect_count(), 0);
client.reconnect();
for _ in 0..2 {
client.ping().timeout(_3_SEC).await.unwrap();
}
assert_eq!(client.reconnect_count(), 1);
}
#[crate::test(tarantool = "crate")]
fn reconnect_now_vs_later() {
fiber::block_on(async {
let client = test_client();
// Client initializes at initial request
client.ping().timeout(_3_SEC).await.unwrap();
assert_eq!(client.reconnect_count(), 0);
async fn reconnect_now_vs_later() {
let client = test_client();
// Client initializes at initial request
client.ping().timeout(_3_SEC).await.unwrap();
assert_eq!(client.reconnect_count(), 0);
// Reconnect happens at the first send
client.reconnect();
assert_eq!(client.reconnect_count(), 0);
client.ping().timeout(_3_SEC).await.unwrap();
assert_eq!(client.reconnect_count(), 1);
// Reconnect happens at the first send
client.reconnect();
assert_eq!(client.reconnect_count(), 0);
client.ping().timeout(_3_SEC).await.unwrap();
assert_eq!(client.reconnect_count(), 1);
// Reconnect happens right away
client.reconnect_now().await.unwrap();
assert_eq!(client.reconnect_count(), 2);
});
// Reconnect happens right away
client.reconnect_now().await.unwrap();
assert_eq!(client.reconnect_count(), 2);
}
// More of an example of how this client can be used than a test
......@@ -265,33 +260,29 @@ mod tests {
}
#[crate::test(tarantool = "crate")]
fn concurrent_messages_one_fiber() {
fiber::block_on(async {
let client = test_client();
let mut ping_futures = vec![];
for _ in 0..10 {
ping_futures.push(client.ping());
}
for res in futures::future::join_all(ping_futures).await {
res.unwrap();
}
});
async fn concurrent_messages_one_fiber() {
let client = test_client();
let mut ping_futures = vec![];
for _ in 0..10 {
ping_futures.push(client.ping());
}
for res in futures::future::join_all(ping_futures).await {
res.unwrap();
}
}
#[crate::test(tarantool = "crate")]
fn try_reconnect_only_once() {
fiber::block_on(async {
let client = Client::new("localhost".into(), 0);
client.ping().await.unwrap_err();
assert_eq!(client.reconnect_count(), 0);
async fn try_reconnect_only_once() {
let client = Client::new("localhost".into(), 0);
client.ping().await.unwrap_err();
assert_eq!(client.reconnect_count(), 0);
// If reconnect was requested once - try to reconnect only once
// even if reconnection fails
client.reconnect();
for _ in 0..10 {
client.ping().await.unwrap_err();
}
assert_eq!(client.reconnect_count(), 1);
});
// If reconnect was requested once - try to reconnect only once
// even if reconnection fails
client.reconnect();
for _ in 0..10 {
client.ping().await.unwrap_err();
}
assert_eq!(client.reconnect_count(), 1);
}
}
......@@ -317,37 +317,33 @@ mod tests {
}
#[crate::test(tarantool = "crate")]
fn read() {
fiber::block_on(async {
let mut stream = TcpStream::connect("localhost", TARANTOOL_LISTEN)
.timeout(_10_SEC)
.await
.unwrap();
// Read greeting
let mut buf = vec![0; 128];
stream.read_exact(&mut buf).timeout(_10_SEC).await.unwrap();
});
async fn read() {
let mut stream = TcpStream::connect("localhost", TARANTOOL_LISTEN)
.timeout(_10_SEC)
.await
.unwrap();
// Read greeting
let mut buf = vec![0; 128];
stream.read_exact(&mut buf).timeout(_10_SEC).await.unwrap();
}
#[crate::test(tarantool = "crate")]
fn read_timeout() {
fiber::block_on(async {
let mut stream = TcpStream::connect("localhost", TARANTOOL_LISTEN)
.timeout(_10_SEC)
async fn read_timeout() {
let mut stream = TcpStream::connect("localhost", TARANTOOL_LISTEN)
.timeout(_10_SEC)
.await
.unwrap();
// Read greeting
let mut buf = vec![0; 128];
assert_eq!(
stream
.read_exact(&mut buf)
.timeout(_0_SEC)
.await
.unwrap();
// Read greeting
let mut buf = vec![0; 128];
assert_eq!(
stream
.read_exact(&mut buf)
.timeout(_0_SEC)
.await
.unwrap_err()
.to_string(),
"deadline expired"
);
});
.unwrap_err()
.to_string(),
"deadline expired"
);
}
#[crate::test(tarantool = "crate")]
......