Skip to content
Snippets Groups Projects
Commit 62025021 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

feat: add support for async fn in #[tarantool::test]

parent b19e7881
No related branches found
No related tags found
1 merge request!314feat: add support for async fn in #[tarantool::test]
......@@ -16,6 +16,20 @@ pub fn impl_macro_attribute(attr: TS1, item: TS1) -> TS1 {
linkme,
should_panic,
} = ctx;
let fn_item = if fn_item.sig.asyncness.is_some() {
let body = fn_item.block;
quote! {
fn #fn_name() {
#tarantool::fiber::block_on(async { #body })
}
}
} else {
quote! {
#fn_item
}
};
quote! {
#[#linkme::distributed_slice(#section)]
#[linkme(crate = #linkme)]
......
......@@ -261,24 +261,20 @@ mod tests {
use super::*;
#[crate::test(tarantool = "crate")]
fn smoke() {
fiber::block_on(async {
let m = Mutex::new(());
drop(m.lock().await);
drop(m.lock().await);
})
async fn smoke() {
let m = Mutex::new(());
drop(m.lock().await);
drop(m.lock().await);
}
#[crate::test(tarantool = "crate")]
fn timeouts() {
fiber::block_on(async {
let m = Mutex::new(());
let _guard = m.lock().await;
let _guard_2 = async { ok(m.lock().await) }
.timeout(Duration::from_millis(50))
.await
.unwrap_err();
})
async fn timeouts() {
let m = Mutex::new(());
let _guard = m.lock().await;
let _guard_2 = async { ok(m.lock().await) }
.timeout(Duration::from_millis(50))
.await
.unwrap_err();
}
#[crate::test(tarantool = "crate")]
......
......@@ -424,6 +424,28 @@ pub use linkme;
/// assert!(false);
/// }
/// ```
///
/// You can also use `#[tarantool::test]` with `async` functions, in which case
/// the body of the test will be wrapped inside `fiber::block_on(async {})`
/// block. The following two tests are equivalent:
/// ```no_run
/// #[tarantool::test]
/// async fn async_test_1() {
/// assert_eq!(foo().await, 1);
/// }
///
/// #[tarantool::test]
/// fn async_test_2() {
/// tarantool::fiber::block_on(async {
/// assert_eq!(foo().await, 1);
/// })
/// }
///
/// async fn foo() -> i32 {
/// 1
/// }
/// ```
///
pub use tarantool_proc::test;
/// Return a global tarantool lua state.
......
......@@ -443,32 +443,26 @@ mod tests {
}
#[crate::test(tarantool = "crate")]
fn connect() {
fiber::block_on(async {
let _client = Client::connect("localhost", TARANTOOL_LISTEN)
.await
.unwrap();
});
async fn connect() {
let _client = Client::connect("localhost", TARANTOOL_LISTEN)
.await
.unwrap();
}
#[crate::test(tarantool = "crate")]
fn connect_failure() {
fiber::block_on(async {
// Can be any other unused port
let err = Client::connect("localhost", 0).await.unwrap_err();
assert!(matches!(dbg!(err), Error::Tcp(_)))
});
async fn connect_failure() {
// Can be any other unused port
let err = Client::connect("localhost", 0).await.unwrap_err();
assert!(matches!(dbg!(err), Error::Tcp(_)))
}
#[crate::test(tarantool = "crate")]
fn ping() {
fiber::block_on(async {
let client = test_client().await;
async fn ping() {
let client = test_client().await;
for _ in 0..5 {
client.ping().timeout(Duration::from_secs(3)).await.unwrap();
}
});
for _ in 0..5 {
client.ping().timeout(Duration::from_secs(3)).await.unwrap();
}
}
#[crate::test(tarantool = "crate")]
......@@ -485,7 +479,7 @@ mod tests {
}
#[crate::test(tarantool = "crate")]
fn execute() {
async fn execute() {
Space::find("test_s1")
.unwrap()
.insert(&(6001, "6001"))
......@@ -495,112 +489,100 @@ mod tests {
.insert(&(6002, "6002"))
.unwrap();
fiber::block_on(async {
let client = test_client().await;
let result = client
.execute(r#"SELECT * FROM "test_s1""#, &(), None)
.timeout(Duration::from_secs(3))
.await
.unwrap();
assert!(result.len() >= 2);
let result = client
.execute(r#"SELECT * FROM "test_s1" WHERE "id" = ?"#, &(6002,), None)
.timeout(Duration::from_secs(3))
.await
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(
result.get(0).unwrap().decode::<(u64, String)>().unwrap(),
(6002, "6002".into())
);
});
let client = test_client().await;
let result = client
.execute(r#"SELECT * FROM "test_s1""#, &(), None)
.timeout(Duration::from_secs(3))
.await
.unwrap();
assert!(result.len() >= 2);
let result = client
.execute(r#"SELECT * FROM "test_s1" WHERE "id" = ?"#, &(6002,), None)
.timeout(Duration::from_secs(3))
.await
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(
result.get(0).unwrap().decode::<(u64, String)>().unwrap(),
(6002, "6002".into())
);
}
#[crate::test(tarantool = "crate")]
fn call() {
fiber::block_on(async {
let client = test_client().await;
let result = client
.call("test_stored_proc", &(1, 2))
.timeout(Duration::from_secs(3))
.await
.unwrap();
assert_eq!(result.unwrap().decode::<(i32,)>().unwrap(), (3,));
});
async fn call() {
let client = test_client().await;
let result = client
.call("test_stored_proc", &(1, 2))
.timeout(Duration::from_secs(3))
.await
.unwrap();
assert_eq!(result.unwrap().decode::<(i32,)>().unwrap(), (3,));
}
#[crate::test(tarantool = "crate")]
fn invalid_call() {
fiber::block_on(async {
let client = test_client().await;
let err = client
.call("unexistent_proc", &())
.timeout(Duration::from_secs(3))
.await
.unwrap_err()
.to_string();
assert_eq!(err, "protocol error: service responded with error: Procedure 'unexistent_proc' is not defined");
});
async fn invalid_call() {
let client = test_client().await;
let err = client
.call("unexistent_proc", &())
.timeout(Duration::from_secs(3))
.await
.unwrap_err()
.to_string();
assert_eq!(err, "protocol error: service responded with error: Procedure 'unexistent_proc' is not defined");
}
#[crate::test(tarantool = "crate")]
fn eval() {
fiber::block_on(async {
let client = test_client().await;
let result = client
.eval("return ...", &(1, 2))
.timeout(Duration::from_secs(3))
.await
.unwrap();
assert_eq!(result.unwrap().decode::<(i32, i32)>().unwrap(), (1, 2));
});
async fn eval() {
let client = test_client().await;
let result = client
.eval("return ...", &(1, 2))
.timeout(Duration::from_secs(3))
.await
.unwrap();
assert_eq!(result.unwrap().decode::<(i32, i32)>().unwrap(), (1, 2));
}
/// A regression test for https://git.picodata.io/picodata/picodata/tarantool-module/-/merge_requests/302
#[crate::test(tarantool = "crate")]
fn client_count_regression() {
fiber::block_on(async {
let client = test_client().await;
// Should close sender and receiver fibers
let close_token = client.0.borrow_mut().close_token.take();
close_token.unwrap().close().unwrap();
// Receiver wakes and closes
fiber::r#yield().unwrap();
client.0.borrow().sender_waker.send(()).unwrap();
// Sender wakes and closes
fiber::r#yield().unwrap();
// Sender and receiver stopped and dropped their refs
assert_eq!(Rc::strong_count(&client.0), 1);
// Cloning a client produces 2 refs
let client_clone = client.clone();
assert_eq!(Rc::strong_count(&client.0), 2);
// Here if client checked by Rc refs <= 3 it would assume it is the last and set state to ClosedManually
drop(client_clone);
assert_eq!(Rc::strong_count(&client.0), 1);
// This would panic on unreachable if previous drop have set the state
client.check_state().unwrap_err();
});
async fn client_count_regression() {
let client = test_client().await;
// Should close sender and receiver fibers
let close_token = client.0.borrow_mut().close_token.take();
close_token.unwrap().close().unwrap();
// Receiver wakes and closes
fiber::r#yield().unwrap();
client.0.borrow().sender_waker.send(()).unwrap();
// Sender wakes and closes
fiber::r#yield().unwrap();
// Sender and receiver stopped and dropped their refs
assert_eq!(Rc::strong_count(&client.0), 1);
// Cloning a client produces 2 refs
let client_clone = client.clone();
assert_eq!(Rc::strong_count(&client.0), 2);
// Here if client checked by Rc refs <= 3 it would assume it is the last and set state to ClosedManually
drop(client_clone);
assert_eq!(Rc::strong_count(&client.0), 1);
// This would panic on unreachable if previous drop have set the state
client.check_state().unwrap_err();
}
#[crate::test(tarantool = "crate")]
fn concurrent_messages_one_fiber() {
fiber::block_on(async {
let client = test_client().await;
let mut ping_futures = vec![];
for _ in 0..10 {
ping_futures.push(client.ping());
}
for res in futures::future::join_all(ping_futures).await {
res.unwrap();
}
});
async fn concurrent_messages_one_fiber() {
let client = test_client().await;
let mut ping_futures = vec![];
for _ in 0..10 {
ping_futures.push(client.ping());
}
for res in futures::future::join_all(ping_futures).await {
res.unwrap();
}
}
}
......@@ -161,54 +161,48 @@ mod tests {
}
#[crate::test(tarantool = "crate")]
fn connect_failure() {
fiber::block_on(async {
// Can be any other unused port
let client = Client::new("localhost".into(), 0);
let err = client.ping().await.unwrap_err();
let correct_err = [
"tcp stream error: failed to connect to supplied address: Connection refused (os error 111)",
"tcp stream error: failed to connect to supplied address: Cannot assign requested address (os error 99)"
].contains(&err.to_string().as_str());
assert!(correct_err);
});
async fn connect_failure() {
// Can be any other unused port
let client = Client::new("localhost".into(), 0);
let err = client.ping().await.unwrap_err();
let correct_err = [
"tcp stream error: failed to connect to supplied address: Connection refused (os error 111)",
"tcp stream error: failed to connect to supplied address: Cannot assign requested address (os error 99)"
].contains(&err.to_string().as_str());
assert!(correct_err);
}
#[crate::test(tarantool = "crate")]
fn ping_after_reconnect() {
fiber::block_on(async {
let client = test_client();
async fn ping_after_reconnect() {
let client = test_client();
for _ in 0..2 {
client.ping().timeout(_3_SEC).await.unwrap();
}
assert_eq!(client.reconnect_count(), 0);
client.reconnect();
for _ in 0..2 {
client.ping().timeout(_3_SEC).await.unwrap();
}
assert_eq!(client.reconnect_count(), 1);
});
for _ in 0..2 {
client.ping().timeout(_3_SEC).await.unwrap();
}
assert_eq!(client.reconnect_count(), 0);
client.reconnect();
for _ in 0..2 {
client.ping().timeout(_3_SEC).await.unwrap();
}
assert_eq!(client.reconnect_count(), 1);
}
#[crate::test(tarantool = "crate")]
fn reconnect_now_vs_later() {
fiber::block_on(async {
let client = test_client();
// Client initializes at initial request
client.ping().timeout(_3_SEC).await.unwrap();
assert_eq!(client.reconnect_count(), 0);
async fn reconnect_now_vs_later() {
let client = test_client();
// Client initializes at initial request
client.ping().timeout(_3_SEC).await.unwrap();
assert_eq!(client.reconnect_count(), 0);
// Reconnect happens at the first send
client.reconnect();
assert_eq!(client.reconnect_count(), 0);
client.ping().timeout(_3_SEC).await.unwrap();
assert_eq!(client.reconnect_count(), 1);
// Reconnect happens at the first send
client.reconnect();
assert_eq!(client.reconnect_count(), 0);
client.ping().timeout(_3_SEC).await.unwrap();
assert_eq!(client.reconnect_count(), 1);
// Reconnect happens right away
client.reconnect_now().await.unwrap();
assert_eq!(client.reconnect_count(), 2);
});
// Reconnect happens right away
client.reconnect_now().await.unwrap();
assert_eq!(client.reconnect_count(), 2);
}
// More of an example of how this client can be used than a test
......@@ -265,33 +259,29 @@ mod tests {
}
#[crate::test(tarantool = "crate")]
fn concurrent_messages_one_fiber() {
fiber::block_on(async {
let client = test_client();
let mut ping_futures = vec![];
for _ in 0..10 {
ping_futures.push(client.ping());
}
for res in futures::future::join_all(ping_futures).await {
res.unwrap();
}
});
async fn concurrent_messages_one_fiber() {
let client = test_client();
let mut ping_futures = vec![];
for _ in 0..10 {
ping_futures.push(client.ping());
}
for res in futures::future::join_all(ping_futures).await {
res.unwrap();
}
}
#[crate::test(tarantool = "crate")]
fn try_reconnect_only_once() {
fiber::block_on(async {
let client = Client::new("localhost".into(), 0);
client.ping().await.unwrap_err();
assert_eq!(client.reconnect_count(), 0);
async fn try_reconnect_only_once() {
let client = Client::new("localhost".into(), 0);
client.ping().await.unwrap_err();
assert_eq!(client.reconnect_count(), 0);
// If reconnect was requested once - try to reconnect only once
// even if reconnection fails
client.reconnect();
for _ in 0..10 {
client.ping().await.unwrap_err();
}
assert_eq!(client.reconnect_count(), 1);
});
// If reconnect was requested once - try to reconnect only once
// even if reconnection fails
client.reconnect();
for _ in 0..10 {
client.ping().await.unwrap_err();
}
assert_eq!(client.reconnect_count(), 1);
}
}
......@@ -317,37 +317,33 @@ mod tests {
}
#[crate::test(tarantool = "crate")]
fn read() {
fiber::block_on(async {
let mut stream = TcpStream::connect("localhost", TARANTOOL_LISTEN)
.timeout(_10_SEC)
.await
.unwrap();
// Read greeting
let mut buf = vec![0; 128];
stream.read_exact(&mut buf).timeout(_10_SEC).await.unwrap();
});
async fn read() {
let mut stream = TcpStream::connect("localhost", TARANTOOL_LISTEN)
.timeout(_10_SEC)
.await
.unwrap();
// Read greeting
let mut buf = vec![0; 128];
stream.read_exact(&mut buf).timeout(_10_SEC).await.unwrap();
}
#[crate::test(tarantool = "crate")]
fn read_timeout() {
fiber::block_on(async {
let mut stream = TcpStream::connect("localhost", TARANTOOL_LISTEN)
.timeout(_10_SEC)
async fn read_timeout() {
let mut stream = TcpStream::connect("localhost", TARANTOOL_LISTEN)
.timeout(_10_SEC)
.await
.unwrap();
// Read greeting
let mut buf = vec![0; 128];
assert_eq!(
stream
.read_exact(&mut buf)
.timeout(_0_SEC)
.await
.unwrap();
// Read greeting
let mut buf = vec![0; 128];
assert_eq!(
stream
.read_exact(&mut buf)
.timeout(_0_SEC)
.await
.unwrap_err()
.to_string(),
"deadline expired"
);
});
.unwrap_err()
.to_string(),
"deadline expired"
);
}
#[crate::test(tarantool = "crate")]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment