diff --git a/src/main.rs b/src/main.rs index d16bcd54c5fcfbfd1c7fa200fb47d3481042f249..01baebfd8d4a3a5c4fb8ae004823e9f074d5c412 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,7 +14,7 @@ use std::time::{Duration, Instant}; use storage::Clusterwide; use storage::{ClusterwideSpace, ProperyName}; use traft::rpc; -use traft::rpc::join; +use traft::rpc::{join, update_instance}; use traft::RaftSpaceAccess; use clap::StructOpt as _; @@ -24,7 +24,6 @@ use crate::tlog::set_log_level; use crate::traft::event::Event; use crate::traft::{event, node, InstanceId, Migration, OpDML}; use crate::traft::{LogicalClock, RaftIndex, TargetGradeVariant}; -use crate::traft::{UpdateInstanceRequest, UpdateInstanceResponse}; use traft::error::Error; mod app; @@ -1014,7 +1013,7 @@ fn postjoin(args: &args::Run, storage: Clusterwide, raft_storage: RaftSpaceAcces "initiating self-activation of {}", instance.instance_id ); - let req = UpdateInstanceRequest::new(instance.instance_id, cluster_id) + let req = update_instance::Request::new(instance.instance_id, cluster_id) .with_target_grade(TargetGradeVariant::Online) .with_failure_domain(args.failure_domain()); @@ -1024,10 +1023,10 @@ fn postjoin(args: &args::Run, storage: Clusterwide, raft_storage: RaftSpaceAcces let now = Instant::now(); let timeout = Duration::from_secs(10); match rpc::net_box_call(&leader_address, &req, timeout) { - Ok(UpdateInstanceResponse::Ok) => { + Ok(update_instance::Response::Ok) => { break; } - Ok(UpdateInstanceResponse::ErrNotALeader) => { + Ok(update_instance::Response::ErrNotALeader) => { tlog!(Warning, "failed to activate myself: not a leader, retry..."); fiber::sleep(Duration::from_millis(100)); continue; diff --git a/src/on_shutdown.rs b/src/on_shutdown.rs index b3f0bd134612570dae98f59b36ca39df703d1150..85c24e4d10d39e75e015e63fd9ac4d34e05e00c1 100644 --- a/src/on_shutdown.rs +++ b/src/on_shutdown.rs @@ -8,9 +8,9 @@ use crate::traft::error::Error; use crate::traft::event; use crate::traft::node; use crate::traft::rpc; +use crate::traft::rpc::update_instance; use crate::traft::CurrentGradeVariant; use crate::traft::TargetGradeVariant; -use crate::traft::{UpdateInstanceRequest, UpdateInstanceResponse}; use crate::unwrap_ok_or; pub fn callback() { @@ -79,7 +79,7 @@ fn go_offline() -> traft::Result<()> { .cluster_id()? .ok_or_else(|| Error::other("missing cluster_id value in storage"))?; - let req = UpdateInstanceRequest::new(instance.instance_id, cluster_id) + let req = update_instance::Request::new(instance.instance_id, cluster_id) .with_target_grade(TargetGradeVariant::Offline); loop { @@ -110,8 +110,8 @@ fn go_offline() -> traft::Result<()> { continue; }; let res = match rpc::net_box_call(&leader_address, &req, Duration::MAX) { - Ok(UpdateInstanceResponse::Ok) => Ok(()), - Ok(UpdateInstanceResponse::ErrNotALeader) => Err(Error::NotALeader), + Ok(update_instance::Response::Ok) => Ok(()), + Ok(update_instance::Response::ErrNotALeader) => Err(Error::NotALeader), Err(e) => Err(e.into()), }; diff --git a/src/traft/mod.rs b/src/traft/mod.rs index f65c772a389d27ff51f3195d2e9d78bacab56794..a27fde1dd28df9cf73444fe9cd88145aae26533c 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -29,9 +29,7 @@ use protobuf::Message as _; pub use network::ConnectionPool; pub use raft_storage::RaftSpaceAccess; -pub use rpc::join; -pub use rpc::update_instance::Request as UpdateInstanceRequest; -pub use rpc::update_instance::Response as UpdateInstanceResponse; +pub use rpc::{join, update_instance}; pub use topology::Topology; use self::event::Event; @@ -821,7 +819,7 @@ pub trait ContextCoercion: Serialize + DeserializeOwned { #[derive(Clone, Debug, Serialize, Deserialize)] pub enum TopologyRequest { Join(join::Request), - UpdateInstance(UpdateInstanceRequest), + UpdateInstance(update_instance::Request), } impl From<join::Request> for TopologyRequest { @@ -830,8 +828,8 @@ impl From<join::Request> for TopologyRequest { } } -impl From<UpdateInstanceRequest> for TopologyRequest { - fn from(a: UpdateInstanceRequest) -> Self { +impl From<update_instance::Request> for TopologyRequest { + fn from(a: update_instance::Request) -> Self { Self::UpdateInstance(a) } } diff --git a/src/traft/node.rs b/src/traft/node.rs index dfdb056ed99f64e12dd0141febd571c769460921..233149dec6a4d392992ae10209dbfc8efaa3c673 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -52,14 +52,13 @@ use crate::traft::event; use crate::traft::event::Event; use crate::traft::notify::Notify; use crate::traft::rpc::sharding::cfg::ReplicasetWeights; -use crate::traft::rpc::{join, replication, sharding, sync}; +use crate::traft::rpc::{join, replication, sharding, sync, update_instance}; use crate::traft::ConnectionPool; use crate::traft::LogicalClock; use crate::traft::Op; use crate::traft::RaftSpaceAccess; use crate::traft::Topology; use crate::traft::TopologyRequest; -use crate::traft::UpdateInstanceRequest; use super::OpResult; use super::{CurrentGrade, CurrentGradeVariant, TargetGradeVariant}; @@ -1106,16 +1105,17 @@ fn raft_conf_change_loop( } // update instance's CurrentGrade - let req = UpdateInstanceRequest::new(instance.instance_id.clone(), cluster_id.clone()) - .with_current_grade(instance.target_grade.into()); + let req = + update_instance::Request::new(instance.instance_id.clone(), cluster_id.clone()) + .with_current_grade(instance.target_grade.into()); tlog!(Info, - "handling UpdateInstanceRequest"; + "handling update_instance::Request"; "current_grade" => %req.current_grade.expect("just set"), "instance_id" => %req.instance_id, ); if let Err(e) = node.handle_topology_request_and_wait(req.into()) { tlog!(Warning, - "failed handling UpdateInstanceRequest: {e}"; + "failed handling update_instance::Request: {e}"; "instance_id" => %instance.instance_id, ); // TODO: don't hard code timeout @@ -1202,7 +1202,7 @@ fn raft_conf_change_loop( "instance_id" => &*instance.instance_id, ); - let req = UpdateInstanceRequest::new(instance.instance_id.clone(), cluster_id) + let req = update_instance::Request::new(instance.instance_id.clone(), cluster_id) .with_current_grade(CurrentGrade::raft_synced( instance.target_grade.incarnation, )); @@ -1269,7 +1269,7 @@ fn raft_conf_change_loop( ); } - let req = UpdateInstanceRequest::new(instance.instance_id.clone(), cluster_id) + let req = update_instance::Request::new(instance.instance_id.clone(), cluster_id) .with_current_grade(CurrentGrade::replicated( instance.target_grade.incarnation, )); @@ -1365,7 +1365,7 @@ fn raft_conf_change_loop( ); } - let req = UpdateInstanceRequest::new(instance.instance_id.clone(), cluster_id) + let req = update_instance::Request::new(instance.instance_id.clone(), cluster_id) .with_current_grade(CurrentGrade::sharding_initialized( instance.target_grade.incarnation, )); @@ -1469,10 +1469,11 @@ fn raft_conf_change_loop( tlog!(Info, "instance is online"; "instance_id" => &*instance_iid); } - let req = UpdateInstanceRequest::new(instance.instance_id.clone(), cluster_id) - .with_current_grade(CurrentGrade::online( - instance.target_grade.incarnation, - )); + let req = + update_instance::Request::new(instance.instance_id.clone(), cluster_id) + .with_current_grade(CurrentGrade::online( + instance.target_grade.incarnation, + )); node.handle_topology_request_and_wait(req.into())?; Ok(()) })() @@ -1491,7 +1492,7 @@ fn raft_conf_change_loop( } in to_online { let cluster_id = cluster_id.clone(); - let req = UpdateInstanceRequest::new(instance_id.clone(), cluster_id) + let req = update_instance::Request::new(instance_id.clone(), cluster_id) .with_current_grade(CurrentGrade::online(target_grade.incarnation)); node.handle_topology_request_and_wait(req.into())?; // TODO: change `Info` to `Debug` diff --git a/src/traft/rpc/expel.rs b/src/traft/rpc/expel.rs index 6cdace745a6340539de08dce82eaa2e30c4ec51b..8e73d9027cdf3ba4879a3bdc46445b12162a42f1 100644 --- a/src/traft/rpc/expel.rs +++ b/src/traft/rpc/expel.rs @@ -1,6 +1,6 @@ use crate::traft; use crate::traft::Result; -use crate::traft::{error::Error, node, InstanceId, UpdateInstanceRequest}; +use crate::traft::{error::Error, node, rpc::update_instance, InstanceId}; crate::define_rpc_request! { fn proc_expel_on_leader(req: Request) -> Result<Response> { @@ -22,7 +22,7 @@ crate::define_rpc_request! { return Err(Error::NotALeader); } - let req2 = UpdateInstanceRequest::new(req.instance_id, req.cluster_id) + let req2 = update_instance::Request::new(req.instance_id, req.cluster_id) .with_target_grade(traft::TargetGradeVariant::Expelled); node.handle_topology_request_and_wait(req2.into())?; diff --git a/src/traft/topology.rs b/src/traft/topology.rs index 85e90ceef337c33cbe17db07b4e8d6b7ec9ee4e0..5745d9eddd1b83744d3b3894eb5ebd90ac46f47c 100644 --- a/src/traft/topology.rs +++ b/src/traft/topology.rs @@ -1,11 +1,11 @@ use std::collections::{BTreeMap, HashMap, HashSet}; +use crate::rpc::update_instance; use crate::traft::instance_uuid; use crate::traft::replicaset_uuid; use crate::traft::Address; use crate::traft::FailureDomain; use crate::traft::Instance; -use crate::traft::UpdateInstanceRequest; use crate::traft::{CurrentGrade, CurrentGradeVariant, Grade, TargetGrade, TargetGradeVariant}; use crate::traft::{InstanceId, RaftId, ReplicasetId}; use crate::util::Uppercase; @@ -160,7 +160,7 @@ impl Topology { Ok((instance, advertise)) } - pub fn update_instance(&mut self, req: UpdateInstanceRequest) -> Result<Instance, String> { + pub fn update_instance(&mut self, req: update_instance::Request) -> Result<Instance, String> { let this = self as *const Self; let (instance, ..) = self @@ -171,7 +171,7 @@ impl Topology { if instance.current_grade == CurrentGradeVariant::Expelled && !matches!( req, - UpdateInstanceRequest { + update_instance::Request { target_grade: None, current_grade: Some(current_grade), failure_domain: None, @@ -232,7 +232,7 @@ mod tests { use crate::traft::replicaset_uuid; use crate::traft::FailureDomain; use crate::traft::Instance; - use crate::traft::UpdateInstanceRequest; + use crate::traft::rpc::update_instance; use crate::traft::{CurrentGrade, Grade, TargetGrade, TargetGradeVariant}; use pretty_assertions::assert_eq; @@ -253,17 +253,17 @@ mod tests { } trait ModifyUpdateInstanceRequest { - fn modify(self, req: UpdateInstanceRequest) -> UpdateInstanceRequest; + fn modify(self, req: update_instance::Request) -> update_instance::Request; } impl ModifyUpdateInstanceRequest for CurrentGrade { - fn modify(self, req: UpdateInstanceRequest) -> UpdateInstanceRequest { + fn modify(self, req: update_instance::Request) -> update_instance::Request { req.with_current_grade(self) } } impl ModifyUpdateInstanceRequest for TargetGradeVariant { - fn modify(self, req: UpdateInstanceRequest) -> UpdateInstanceRequest { + fn modify(self, req: update_instance::Request) -> update_instance::Request { req.with_target_grade(self) } } @@ -335,7 +335,7 @@ mod tests { ) => { $topology.update_instance( { - let req = UpdateInstanceRequest::new($instance_id.into(), "".into()); + let req = update_instance::Request::new($instance_id.into(), "".into()); $( let req = $current_grade.modify(req); $( let req = $target_grade.modify(req); )? @@ -353,7 +353,7 @@ mod tests { $failure_domain:expr $(,)? ) => { $topology.update_instance( - UpdateInstanceRequest::new($instance_id.into(), "".into()) + update_instance::Request::new($instance_id.into(), "".into()) .with_failure_domain($failure_domain), ) };