diff --git a/src/main.rs b/src/main.rs index 3adff6ce0138235a3d28c60aeccba7410b32dbff..1fbd27aee99755324c4b2178bab0d0cf93b242ea 100644 --- a/src/main.rs +++ b/src/main.rs @@ -396,6 +396,7 @@ fn init_handlers() { declare_cfunc!(traft::node::raft_sync_raft); declare_cfunc!(traft::failover::raft_update_peer); declare_cfunc!(traft::rpc::replication::proc_replication); + declare_cfunc!(traft::rpc::sharding::proc_sharding); } fn rm_tarantool_files(data_dir: &str) { diff --git a/src/traft/error.rs b/src/traft/error.rs index 7a448d056666722c30e7ac5252ddf64962f2fac2..9abf1617dfff5512aa5cbff80627bb95aceb6e66 100644 --- a/src/traft/error.rs +++ b/src/traft/error.rs @@ -29,6 +29,8 @@ pub enum Error { instance_rsid: String, requested_rsid: String, }, + #[error("operation request from non leader {actual}, current leader is {expected}")] + LeaderIdMismatch { expected: RaftId, actual: RaftId }, #[error("error during execution of lua code: {0}")] Lua(#[from] LuaError), #[error("{0}")] @@ -37,6 +39,8 @@ pub enum Error { NoPeerWithRaftId(RaftId), #[error("peer with id \"{0}\" not found")] NoPeerWithInstanceId(InstanceId), + #[error("leader is uknown yet")] + LeaderUnknown, #[error("other error: {0}")] Other(Box<dyn std::error::Error>), } diff --git a/src/traft/rpc/sharding.rs b/src/traft/rpc/sharding.rs index 2c8259b425402bdc42983b603603cdec88ffdc30..921a8eec07c61aa1cbb308765cfeb783c0679b51 100644 --- a/src/traft/rpc/sharding.rs +++ b/src/traft/rpc/sharding.rs @@ -1,3 +1,68 @@ +use ::tarantool::{proc, tlua}; + +use crate::traft::storage::StateKey; +use crate::traft::{error::Error, node, RaftId, RaftTerm}; + +#[proc(packed_args)] +fn proc_sharding(req: Request) -> Result<Response, Error> { + let node = node::global()?; + let leader_id = node.status().leader_id.ok_or(Error::LeaderUnknown)?; + if req.leader_id != leader_id { + return Err(Error::LeaderIdMismatch { + expected: leader_id, + actual: req.leader_id, + }); + } + // TODO: check term matches + let _ = req.term; + + let storage = &node.storage; + let cfg = cfg::Cfg::from_storage(&storage.peers)?; + + let lua = ::tarantool::lua_state(); + // TODO: fix user's permissions + lua.exec("box.session.su('admin')")?; + // TODO: only done on instances with corresponding roles + lua.exec_with("vshard.storage.cfg(..., box.info.uuid)", &cfg) + .map_err(tlua::LuaError::from)?; + // TODO: only done on instances with corresponding roles + lua.exec_with("vshard.router.cfg(...)", &cfg) + .map_err(tlua::LuaError::from)?; + + // TODO: governor should decide who does this, and propose a OpDML entry + // afterwards + if !storage + .state + .get(StateKey::VshardBootstrapped)? + .unwrap_or(false) + { + lua.exec("vshard.router.bootstrap()")?; + storage.state.put(StateKey::VshardBootstrapped, &true)?; + } + + Ok(Response {}) +} + +/// Request to configure vshard. +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct Request { + pub leader_id: RaftId, + pub term: RaftTerm, +} +impl ::tarantool::tuple::Encode for Request {} + +/// Response to [`sharding::Request`]. +/// +/// [`sharding::Request`]: Request +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct Response {} +impl ::tarantool::tuple::Encode for Response {} + +impl super::Request for Request { + const PROC_NAME: &'static str = crate::stringify_cfunc!(proc_sharding); + type Response = Response; +} + #[rustfmt::skip] pub mod cfg { use crate::traft::error::Error; diff --git a/src/traft/storage.rs b/src/traft/storage.rs index b8cde22e17369c82be469355394f65fb34a28f3b..3d36eacd7d9ebfdde0b07222614e0324abaa074b 100644 --- a/src/traft/storage.rs +++ b/src/traft/storage.rs @@ -77,6 +77,7 @@ define_str_enum! { /// An enumeration of builtin raft spaces pub enum StateKey { ReplicationFactor = "replication_factor", + VshardBootstrapped = "vshard_bootstrapped", } FromStr::Err = UnknownStateKey;