diff --git a/src/app.rs b/src/app.rs new file mode 100644 index 0000000000000000000000000000000000000000..8c30f263c18b8c0a0eb236a55a8c36b45707c9ed --- /dev/null +++ b/src/app.rs @@ -0,0 +1,297 @@ +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::error::Error; +use std::fmt::Display; +use std::time::Duration; + +pub trait Event { + fn wait_timeout(&self, t: Duration) -> bool; + fn set(&self); +} + +pub trait AppError: Error {} +impl<T> AppError for T where T: Error {} + +pub struct SchemaUpate { + version: &'static str, + ddl: &'static dyn FnOnce() -> Result<(), Box<dyn AppError>>, +} + +pub trait App { + const NAME: &'static str; + const VERSION: &'static str; + const SCHEMA: &'static [SchemaUpate] = &[SchemaUpate { + version: "v0.0.0", + ddl: &|| Ok(()), + }]; + + /// initialization, no yields, no I/O + fn new(rpc: &mut impl RpcRegister) -> Result<Box<Self>, Box<dyn AppError>>; + + /// blocking long-running call, can yield + fn run( + &mut self, + stop: &impl Event, + storage: &impl Storage, + rpc: &impl RpcCall, + ) -> Result<(), Box<dyn AppError>>; +} + +pub trait Storage { + // Cluster-wide Box API + fn create_space(&mut self); + fn space(&self, name: &str) -> tarantool::space::Space; + fn drop_space(&mut self, name: &str); + // ... +} + +pub trait RpcRequest: DeserializeOwned {} +impl<T> RpcRequest for T where T: DeserializeOwned {} + +pub trait RpcResponse: Serialize {} +impl<T> RpcResponse for T where T: Serialize {} + +pub trait RpcError: Display {} +impl<T> RpcError for T where T: Display {} + +pub trait RpcRegister { + #[allow(unused_variables)] + fn register<Req, Res, E>( + &mut self, + name: &str, + handler: impl FnMut(Req, &dyn Storage) -> Result<Res, E>, + ) where + Res: RpcResponse, + Req: RpcRequest, + E: RpcError, + { + todo!() + } +} + +pub trait RpcCall { + #[allow(unused_variables)] + fn call<Req, Res, E>(&self, name: &str, request: Req) -> Result<Res, E> + where + Res: RpcResponse, + Req: RpcRequest, + E: RpcError, + { + todo!() + } +} +pub trait Rpc: RpcCall + RpcRegister {} + +#[allow(clippy::all)] +#[allow(unused_variables)] +mod example { + use super::*; + use serde::{Deserialize, Serialize}; + use std::fmt::{Display, Formatter}; + use std::time::Duration; + use tarantool::fiber; + use thiserror::Error; + + mod rpc { + use super::*; + + pub struct SomeRpc(); + + impl RpcCall for SomeRpc {} + + impl Rpc for SomeRpc {} + + impl RpcRegister for SomeRpc { + fn register<Req, Res, E>( + &mut self, + name: &str, + handler: impl FnMut(Req, &dyn Storage) -> Result<Res, E>, + ) where + Res: RpcResponse, + Req: RpcRequest, + E: RpcError, + { + todo!() + } + } + } + + mod storage { + use super::*; + + pub struct SomeStorage(); + + impl Storage for SomeStorage { + fn create_space(&mut self) { + todo!() + } + + fn space(&self, name: &str) -> tarantool::space::Space { + todo!() + } + + fn drop_space(&mut self, name: &str) { + todo!() + } + } + } + + mod event { + use std::time::Duration; + + pub struct SomeEvent(); + impl super::Event for SomeEvent { + fn wait_timeout(&self, t: Duration) -> bool { + todo!() + } + fn set(&self) { + todo!() + } + } + } + + mod user_code { + use super::*; + + pub struct App { + greeting: String, + } + + #[derive(Serialize, Deserialize)] + struct HelloRequest { + name: String, + } + + #[derive(Serialize, Deserialize)] + struct HelloResponse { + greeting: String, + } + + #[derive(Debug, Error)] + #[allow(dead_code)] + struct MyAppError { + message: String, + } + + impl Display for MyAppError { + #[allow(unused_variables)] + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + unimplemented!(); + } + } + + impl App { + fn hello( + &self, + request: HelloRequest, + storage: &dyn Storage, + ) -> Result<HelloResponse, MyAppError> { + let name = request.name; + + Ok(if Self::is_greeted(storage, &name) { + HelloResponse { + greeting: format!("{} again, {}!", self.greeting, name), + } + } else { + Self::set_greeted(storage, &name); + HelloResponse { + greeting: format!("{}, {}!", self.greeting, name), + } + }) + } + + fn is_greeted(storage: &dyn Storage, name: &str) -> bool { + storage.space("hello_log").get(&(name,)).unwrap().is_some() + } + + fn set_greeted(storage: &dyn Storage, name: &str) { + storage.space("hello_log").insert(&(name,)).unwrap(); + } + } + + impl From<tarantool::error::Error> for Box<dyn AppError> { + fn from(_: tarantool::error::Error) -> Self { + todo!() + } + } + + impl super::App for App { + const NAME: &'static str = "MyApp"; + const VERSION: &'static str = "0.1.0"; + const SCHEMA: &'static [SchemaUpate] = &[ + // ! Only add elements, never delete or change. + SchemaUpate { + version: "1.0.0", + ddl: &|| { + tarantool::schema::space::create_space("hello_log", &Default::default())? + .create_index("pk", &Default::default())?; + Ok(()) + }, + }, + ]; + + fn new(rpc: &mut impl RpcRegister) -> Result<Box<Self>, Box<dyn AppError>> { + let app = App { + greeting: "Hello".into(), + }; + + rpc.register("hello", |request, storage| app.hello(request, storage)); + + Ok(Box::new(app)) + } + + fn run( + &mut self, + stop: &impl Event, + storage: &impl Storage, + rpc: &impl RpcCall, + ) -> Result<(), Box<dyn AppError>> { + // setup() + + loop { + if stop.wait_timeout(Duration::from_millis(100)) { + break; + } + + storage.space("hello_log").delete(&("nobody",)).unwrap(); + + let _: Result<HelloResponse, MyAppError> = + rpc.call("hello", HelloRequest { name: "Bob".into() }); + } + + // teardown() + + Ok(()) + } + } + } + + struct RunAppError(); + + fn exec_cluster_ddl( + version: &str, + ddl: &dyn FnOnce() -> Result<(), Box<dyn AppError>>, + ) -> Result<(), RunAppError> { + todo!() + } + + #[allow(dead_code)] + fn run_app() -> Result<(), RunAppError> { + let mut rpc = rpc::SomeRpc(); + + let mut app: user_code::App = *App::new(&mut rpc).unwrap(); + for s in user_code::App::SCHEMA { + let SchemaUpate { version, ddl } = s; + exec_cluster_ddl(version, *ddl)? + } + + let event = &event::SomeEvent(); + let storage = &storage::SomeStorage(); + + let jh = fiber::defer_proc(move || app.run(event, storage, &rpc).unwrap()); + event.set(); + + jh.join(); + Ok(()) + } +} diff --git a/src/main.rs b/src/main.rs index fa8928e31176338bbf0acfaf060bc3bf56773b9c..68a57d94c4f9eefcb33534ceb4831baab3822c1c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ use ::tarantool::tlua; use ::tarantool::tuple::{FunctionArgs, FunctionCtx, Tuple}; use indoc::indoc; +mod app; mod args; mod error; mod message;