Skip to content
Snippets Groups Projects
app.rs 8.33 KiB
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::error::Error;
use std::fmt::Display;
use std::time::Duration;
use tarantool::error::Error as TarantoolError;
use tarantool::index::IndexOptions;
use tarantool::space::{Space, SpaceCreateOptions};

pub trait Event {
    fn wait_timeout(&self, t: Duration) -> bool;
    fn set(&self);
}

pub trait AppError: Error {}
impl<T> AppError for T where T: Error {}

#[allow(dead_code)]
enum DDL {
    CreateSpace {
        name: &'static str,
        opts: SpaceCreateOptions,
    },
    CreateIndex {
        space_name: &'static str,
        name: &'static str,
        opts: IndexOptions,
    },
}

pub struct SchemaUpate {
    version: &'static str,
    ddl: Box<[DDL]>,
}

pub trait App {
    const NAME: &'static str;
    const VERSION: &'static str;

    fn schema() -> Box<[SchemaUpate]> {
        Box::new([SchemaUpate {
            version: "v0.0.0",
            ddl: [].into(),
        }])
    }

    /// initialization, no yields, no I/O
    fn new(rpc: &mut impl RpcRegister) -> Result<Box<Self>, Box<dyn AppError>>;

    /// blocking long-running call, can yield
    fn run(
        &mut self,
        stop: &impl Event,
        storage: &impl Storage,
        rpc: &impl RpcCall,
    ) -> Result<(), Box<dyn AppError>>;
}

pub trait Storage {
    // Cluster-wide Box API
    fn create_space(
        &mut self,
        name: &str,
        opts: &SpaceCreateOptions,
    ) -> Result<Space, TarantoolError>;
    fn space(&self, name: &str) -> Space;
    fn drop_space(&mut self, name: &str);
    // ...
}
pub trait RpcRequest: DeserializeOwned {}
impl<T> RpcRequest for T where T: DeserializeOwned {}

pub trait RpcResponse: Serialize {}
impl<T> RpcResponse for T where T: Serialize {}

pub trait RpcError: Display {}
impl<T> RpcError for T where T: Display {}

pub trait RpcRegister {
    #[allow(unused_variables)]
    fn register<Req, Res, E>(
        &mut self,
        name: &str,
        handler: impl FnMut(Req, &dyn Storage) -> Result<Res, E>,
    ) where
        Res: RpcResponse,
        Req: RpcRequest,
        E: RpcError,
    {
        todo!()
    }
}

pub trait RpcCall {
    #[allow(unused_variables)]
    fn call<Req, Res, E>(&self, name: &str, request: Req) -> Result<Res, E>
    where
        Res: RpcResponse,
        Req: RpcRequest,
        E: RpcError,
    {
        todo!()
    }
}
pub trait Rpc: RpcCall + RpcRegister {}

#[allow(clippy::all)]
#[allow(unused_variables)]
mod example {
    use super::*;
    use serde::{Deserialize, Serialize};
    use std::fmt::{Display, Formatter};
    use std::time::Duration;
    use tarantool::fiber;
    use thiserror::Error;

    mod rpc {
        use super::*;

        pub struct SomeRpc();

        impl RpcCall for SomeRpc {}

        impl Rpc for SomeRpc {}

        impl RpcRegister for SomeRpc {
            fn register<Req, Res, E>(
                &mut self,
                name: &str,
                handler: impl FnMut(Req, &dyn Storage) -> Result<Res, E>,
            ) where
                Res: RpcResponse,
                Req: RpcRequest,
                E: RpcError,
            {
                todo!()
            }
        }
    }

    mod storage {
        use super::*;

        pub struct SomeStorage();

        impl Storage for SomeStorage {
            fn create_space(
                &mut self,
                name: &str,
                opts: &SpaceCreateOptions,
            ) -> Result<Space, TarantoolError> {
                todo!()
            }

            fn space(&self, name: &str) -> tarantool::space::Space {
                todo!()
            }

            fn drop_space(&mut self, name: &str) {
                todo!()
            }
        }
    }

    mod event {
        use std::time::Duration;

        pub struct SomeEvent();
        impl super::Event for SomeEvent {
            fn wait_timeout(&self, t: Duration) -> bool {
                todo!()
            }
            fn set(&self) {
                todo!()
            }
        }
    }

    mod user_code {
        use super::*;

        pub struct App {
            greeting: String,
        }

        #[derive(Serialize, Deserialize)]
        struct HelloRequest {
            name: String,
        }

        #[derive(Serialize, Deserialize)]
        struct HelloResponse {
            greeting: String,
        }

        #[derive(Debug, Error)]
        #[allow(dead_code)]
        struct MyAppError {
            message: String,
        }

        impl Display for MyAppError {
            #[allow(unused_variables)]
            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
                unimplemented!();
            }
        }

        impl App {
            fn hello(
                &self,
                request: HelloRequest,
                storage: &dyn Storage,
            ) -> Result<HelloResponse, MyAppError> {
                let name = request.name;

                Ok(if Self::is_greeted(storage, &name) {
                    HelloResponse {
                        greeting: format!("{} again, {}!", self.greeting, name),
                    }
                } else {
                    Self::set_greeted(storage, &name);
                    HelloResponse {
                        greeting: format!("{}, {}!", self.greeting, name),
                    }
                })
            }

            fn is_greeted(storage: &dyn Storage, name: &str) -> bool {
                storage.space("hello_log").get(&(name,)).unwrap().is_some()
            }

            fn set_greeted(storage: &dyn Storage, name: &str) {
                storage.space("hello_log").insert(&(name,)).unwrap();
            }
        }

        impl From<tarantool::error::Error> for Box<dyn AppError> {
            fn from(_: tarantool::error::Error) -> Self {
                todo!()
            }
        }

        impl super::App for App {
            const NAME: &'static str = "MyApp";
            const VERSION: &'static str = "0.1.0";

            fn schema() -> Box<[SchemaUpate]> {
                Box::new([
                    // ! Only add elements, never delete or change.
                    SchemaUpate {
                        version: "1.0.0",
                        ddl: Box::new([
                            DDL::CreateSpace {
                                name: "hello_log",
                                opts: SpaceCreateOptions::default(),
                            },
                            DDL::CreateIndex {
                                space_name: "hello_log",
                                name: "pk",
                                opts: IndexOptions::default(),
                            },
                        ]),
                    },
                ])
            }

            fn new(rpc: &mut impl RpcRegister) -> Result<Box<Self>, Box<dyn AppError>> {
                let app = App {
                    greeting: "Hello".into(),
                };

                rpc.register("hello", |request, storage| app.hello(request, storage));

                Ok(Box::new(app))
            }

            fn run(
                &mut self,
                stop: &impl Event,
                storage: &impl Storage,
                rpc: &impl RpcCall,
            ) -> Result<(), Box<dyn AppError>> {
                // setup()

                loop {
                    if stop.wait_timeout(Duration::from_millis(100)) {
                        break;
                    }

                    storage.space("hello_log").delete(&("nobody",)).unwrap();

                    let _: Result<HelloResponse, MyAppError> =
                        rpc.call("hello", HelloRequest { name: "Bob".into() });
                }

                // teardown()

                Ok(())
            }
        }
    }

    struct RunAppError();

    fn exec_cluster_ddl(version: &str, ddl: &[DDL]) -> Result<(), RunAppError> {
        todo!()
    }

    #[allow(dead_code)]
    fn run_app() -> Result<(), RunAppError> {
        let mut rpc = rpc::SomeRpc();

        let mut app: user_code::App = *App::new(&mut rpc).unwrap();
        for s in user_code::App::schema().as_ref() {
            let SchemaUpate { version, ddl } = s;
            exec_cluster_ddl(version, ddl)?
        }

        let event = &event::SomeEvent();
        let storage = &storage::SomeStorage();

        let jh = fiber::defer_proc(move || app.run(event, storage, &rpc).unwrap());
        event.set();

        jh.join();
        Ok(())
    }
}