diff --git a/src/main.rs b/src/main.rs index 8709ebc4cdedfd692c8d5e3b744d4cb6a43b2079..9ef1af84ec00f205e5634a1bff114b4552a7ccaf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -464,7 +464,7 @@ fn init_handlers() { declare_cfunc!(discovery::proc_discover); declare_cfunc!(traft::node::proc_raft_interact); - declare_cfunc!(traft::node::proc_raft_join); + declare_cfunc!(traft::rpc::join::proc_raft_join); declare_cfunc!(traft::rpc::expel::proc_expel_on_leader); declare_cfunc!(traft::rpc::expel::redirect::proc_expel_redirect); declare_cfunc!(traft::rpc::sync::proc_sync_raft); diff --git a/src/traft/mod.rs b/src/traft/mod.rs index 1ed152a727e9d61b0845c70e4957e73e2a0de97c..b15501430f189a54779a1b63663349d91e1468f3 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -28,6 +28,8 @@ use protobuf::Message as _; pub use network::ConnectionPool; pub use raft_storage::RaftSpaceAccess; +pub use rpc::join::Request as JoinRequest; +pub use rpc::join::Response as JoinResponse; pub use rpc::update_peer::Request as UpdatePeerRequest; pub use rpc::update_peer::Response as UpdatePeerResponse; use storage::ClusterSpace; @@ -833,31 +835,6 @@ impl From<UpdatePeerRequest> for TopologyRequest { } } -/////////////////////////////////////////////////////////////////////////////// -/// Request to join the cluster. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct JoinRequest { - pub cluster_id: String, - pub instance_id: Option<InstanceId>, - pub replicaset_id: Option<ReplicasetId>, - pub advertise_address: String, - pub failure_domain: FailureDomain, -} -impl Encode for JoinRequest {} - -/////////////////////////////////////////////////////////////////////////////// -/// Response to a JoinRequest -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct JoinResponse { - pub peer: Box<Peer>, - pub raft_group: Vec<Peer>, - pub box_replication: Vec<String>, - // TODO add later: - // Other parameters necessary for box.cfg() - // pub read_only: bool, -} -impl Encode for JoinResponse {} - /////////////////////////////////////////////////////////////////////////////// ::tarantool::define_str_enum! { diff --git a/src/traft/node.rs b/src/traft/node.rs index 34321a6c0b6f13e37f6c9f5a32539427740ff65f..2f224d3848e4a2ea42ddd1cb3808530db3b83618 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -59,7 +59,7 @@ use crate::traft::LogicalClock; use crate::traft::Op; use crate::traft::Topology; use crate::traft::TopologyRequest; -use crate::traft::{JoinRequest, JoinResponse, UpdatePeerRequest}; +use crate::traft::{JoinRequest, UpdatePeerRequest}; use crate::traft::{RaftSpaceAccess, Storage}; use super::OpResult; @@ -1631,47 +1631,3 @@ fn proc_raft_interact(pbs: Vec<traft::MessagePb>) -> traft::Result<()> { } Ok(()) } - -#[proc(packed_args)] -fn proc_raft_join(req: JoinRequest) -> traft::Result<JoinResponse> { - crate::tarantool::fiber_name("proc_raft_join"); - - let node = global()?; - - let cluster_id = node - .storage - .raft - .cluster_id()? - .expect("cluster_id is set on boot"); - - if req.cluster_id != cluster_id { - return Err(Error::ClusterIdMismatch { - instance_cluster_id: req.cluster_id, - cluster_cluster_id: cluster_id, - }); - } - - let peer = node.handle_topology_request_and_wait(req.into())?; - let box_replication = node - .storage - .peers - .replicaset_peer_addresses(&peer.replicaset_id, Some(peer.commit_index))?; - - // A joined peer needs to communicate with other nodes. - // Provide it the list of raft voters in response. - let mut raft_group = vec![]; - for raft_id in node.storage.raft.voters()?.unwrap_or_default().into_iter() { - match node.storage.peers.get(&raft_id) { - Err(e) => { - crate::warn_or_panic!("failed reading peer with id `{}`: {}", raft_id, e); - } - Ok(peer) => raft_group.push(peer), - } - } - - Ok(JoinResponse { - peer, - raft_group, - box_replication, - }) -} diff --git a/src/traft/rpc/join.rs b/src/traft/rpc/join.rs new file mode 100644 index 0000000000000000000000000000000000000000..31018c28095e402916f685a20d831ea7b03ff2d3 --- /dev/null +++ b/src/traft/rpc/join.rs @@ -0,0 +1,62 @@ +use crate::traft::{error::Error, node, FailureDomain, InstanceId, Peer, ReplicasetId, Result}; + +crate::define_rpc_request! { + fn proc_raft_join(req: Request) -> Result<Response> { + let node = node::global()?; + + let cluster_id = node + .storage + .raft + .cluster_id()? + .expect("cluster_id is set on boot"); + + if req.cluster_id != cluster_id { + return Err(Error::ClusterIdMismatch { + instance_cluster_id: req.cluster_id, + cluster_cluster_id: cluster_id, + }); + } + + let peer = node.handle_topology_request_and_wait(req.into())?; + let box_replication = node + .storage + .peers + .replicaset_peer_addresses(&peer.replicaset_id, Some(peer.commit_index))?; + + // A joined peer needs to communicate with other nodes. + // Provide it the list of raft voters in response. + let mut raft_group = vec![]; + for raft_id in node.storage.raft.voters()?.unwrap_or_default().into_iter() { + match node.storage.peers.get(&raft_id) { + Err(e) => { + crate::warn_or_panic!("failed reading peer with id `{}`: {}", raft_id, e); + } + Ok(peer) => raft_group.push(peer), + } + } + + Ok(Response { + peer, + raft_group, + box_replication, + }) + } + + /// Request to join the cluster. + pub struct Request { + pub cluster_id: String, + pub instance_id: Option<InstanceId>, + pub replicaset_id: Option<ReplicasetId>, + pub advertise_address: String, + pub failure_domain: FailureDomain, + } + + /// Response to a [`join::Request`](Request). + pub struct Response { + pub peer: Box<Peer>, + pub raft_group: Vec<Peer>, + pub box_replication: Vec<String>, + // Other parameters necessary for box.cfg() + // TODO + } +} diff --git a/src/traft/rpc/mod.rs b/src/traft/rpc/mod.rs index e3834116110575a260fa2f0750e247d765b0debb..cceec9c4192d218e0a3746f968c0273ace923636 100644 --- a/src/traft/rpc/mod.rs +++ b/src/traft/rpc/mod.rs @@ -11,6 +11,7 @@ use std::time::Duration; use serde::de::DeserializeOwned; pub mod expel; +pub mod join; pub mod migration; pub mod replication; pub mod sharding; @@ -27,11 +28,6 @@ pub trait Request: Encode + DecodeOwned { type Response: Encode + DeserializeOwned + Debug + 'static; } -impl Request for super::JoinRequest { - const PROC_NAME: &'static str = crate::stringify_cfunc!(super::node::proc_raft_join); - type Response = super::JoinResponse; -} - #[inline(always)] pub fn net_box_call<R>( address: impl ToSocketAddrs + Display,