From f5835629ea62c5777de5f7e056d056cd07db44cb Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Thu, 20 Oct 2022 14:07:01 +0300 Subject: [PATCH] feat(sharding): vshard finilazing --- src/traft/node.rs | 124 +++++++++++++++++++++++++++++++------- src/traft/rpc/sharding.rs | 18 ++++-- src/traft/storage.rs | 8 +++ 3 files changed, 124 insertions(+), 26 deletions(-) diff --git a/src/traft/node.rs b/src/traft/node.rs index ad378318a5..712e92ced7 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -14,7 +14,6 @@ use ::tarantool::error::TransactionError; use ::tarantool::fiber; use ::tarantool::fiber::{Cond, Mutex}; use ::tarantool::proc; -use tarantool::space::UpdateOps; use ::tarantool::tlua; use ::tarantool::transaction::start_transaction; use std::cell::Cell; @@ -24,17 +23,18 @@ use std::convert::TryFrom; use std::rc::Rc; use std::time::Duration; use std::time::Instant; +use tarantool::space::UpdateOps; use crate::kvcell::KVCell; use crate::r#loop::{FlowControl, Loop}; use crate::stringify_cfunc; +use crate::traft::storage::ClusterSpace; use crate::traft::ContextCoercion as _; use crate::traft::InstanceId; use crate::traft::Peer; use crate::traft::RaftId; use crate::traft::RaftIndex; use crate::traft::RaftTerm; -use crate::traft::storage::ClusterSpace; use crate::warn_or_panic; use crate::{unwrap_ok_or, unwrap_some_or}; use ::tarantool::util::IntoClones as _; @@ -48,9 +48,10 @@ use crate::traft::event; use crate::traft::event::Event; use crate::traft::failover; use crate::traft::notify::Notify; +use crate::traft::rpc::sharding::cfg::ReplicasetWeights; use crate::traft::rpc::{replication, sharding}; use crate::traft::storage::peer_field; -use crate::traft::storage::StateKey; +use crate::traft::storage::{State, StateKey}; use crate::traft::ConnectionPool; use crate::traft::LogicalClock; use crate::traft::Op; @@ -1048,6 +1049,8 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { //////////////////////////////////////////////////////////////////////// // raft sync + // TODO: putting each stage in a different function + // will make the control flow more readable let to_sync = peers .iter() .find(|peer| peer.has_grades(CurrentGrade::Offline, TargetGrade::Online)); @@ -1101,12 +1104,16 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { //////////////////////////////////////////////////////////////////////// // replication + // TODO: putting each stage in a different function + // will make error handling much easier let to_replicate = peers .iter() .find(|peer| peer.has_grades(CurrentGrade::RaftSynced, TargetGrade::Online)); if let Some(peer) = to_replicate { let replicaset_id = &peer.replicaset_id; let replicaset_iids = unwrap_ok_or!( + // TODO: filter out Offline & Expelled peers + // TODO: use `peers` instead storage.peers.replicaset_fields::<peer_field::InstanceId>(replicaset_id), Err(e) => { tlog!(Warning, "failed reading replicaset instances: {e}"; @@ -1201,8 +1208,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { ); // TODO: don't hard code timeout - event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)) - .unwrap(); + event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); continue 'governor; } } @@ -1214,11 +1220,15 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { //////////////////////////////////////////////////////////////////////// // sharding + // TODO: putting each stage in a different function + // will make error handling much easier let need_sharding = peers .iter() .any(|peer| peer.has_grades(CurrentGrade::Replicated, TargetGrade::Online)); if need_sharding { let peer_ids = unwrap_ok_or!( + // TODO: filter out Offline & Expelled peers + // TODO: use `peers` instead storage.peers.peers_fields::<peer_field::InstanceId>(), Err(e) => { tlog!(Warning, "failed reading peer instances: {e}"); @@ -1236,13 +1246,14 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { sharding::Request { leader_id: raft_id, term, + weights: None, }, // TODO: don't hard code timeout Duration::from_secs(3), ); let res = unwrap_ok_or!(res, Err(e) => { - tlog!(Warning, "failed to configure sharding: {e}"); + tlog!(Warning, "failed to initialize sharding: {e}"); continue 'governor; } ); @@ -1264,7 +1275,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { ); } Err(e) => { - tlog!(Warning, "failed to configure sharding: {e}"; + tlog!(Warning, "failed to initialize sharding: {e}"; "instance_id" => &*peer_iid, ); @@ -1276,29 +1287,82 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { } } - tlog!(Info, "configured sharding"); + tlog!(Info, "sharding is initialized"); continue 'governor; } //////////////////////////////////////////////////////////////////////// - // ??? - // TODO - - let to_online = peers + // sharding weights + let maybe_need_weights_update = peers .iter() - .find(|peer| peer.has_grades(CurrentGrade::ShardingInitialized, TargetGrade::Online)); - if let Some(peer) = to_online { - let instance_id = peer.instance_id.clone(); - let cluster_id = storage.raft.cluster_id().unwrap().unwrap(); - let req = UpdatePeerRequest::new(instance_id, cluster_id) - .with_current_grade(CurrentGrade::Online); - let res = node.handle_topology_request_and_wait(req.into()); + .any(|peer| peer.has_grades(CurrentGrade::ShardingInitialized, TargetGrade::Online)); + if maybe_need_weights_update { + let res = if let Some(new_weights) = get_new_weights(&peers, &storage.state) { + (|| -> Result<(), Error> { + let res = call_all( + &mut pool, + peers.iter().map(|peer| peer.instance_id.clone()), + sharding::Request { + leader_id: raft_id, + term, + weights: Some(new_weights.clone()), + }, + // TODO: don't hard code timeout + Duration::from_secs(3), + )?; + + let cluster_id = storage.raft.cluster_id()?.unwrap(); + for (peer_iid, resp) in res { + let cluster_id = cluster_id.clone(); + let peer_iid_2 = peer_iid.clone(); + resp.and_then(move |sharding::Response {}| { + let req = UpdatePeerRequest::new(peer_iid_2, cluster_id) + .with_current_grade(CurrentGrade::Online); + node.handle_topology_request_and_wait(req.into()) + })?; + // TODO: change `Info` to `Debug` + tlog!(Info, "peer is online"; "instance_id" => &*peer_iid); + } + + node.propose_and_wait( + // TODO: OpDML::update with just the changes + traft::OpDML::replace( + ClusterSpace::State, + &(StateKey::ReplicasetWeights, new_weights), + )?, + // TODO: don't hard code the timeout + Duration::from_secs(3), + )??; + Ok(()) + })() + } else { + (|| -> Result<(), Error> { + let cluster_id = storage.raft.cluster_id()?.unwrap(); + let to_online = peers.iter().filter(|peer| { + peer.has_grades(CurrentGrade::ShardingInitialized, TargetGrade::Online) + }); + for Peer { instance_id, .. } in to_online { + let cluster_id = cluster_id.clone(); + let req = UpdatePeerRequest::new(instance_id.clone(), cluster_id) + .with_current_grade(CurrentGrade::Online); + node.handle_topology_request_and_wait(req.into())?; + // TODO: change `Info` to `Debug` + tlog!(Info, "peer is online"; "instance_id" => &**instance_id); + } + Ok(()) + })() + }; if let Err(e) = res { - tlog!(Warning, "failed to set peer online: {e}"; - "instance_id" => &*peer.instance_id, - ); + tlog!(Warning, "updating sharding weights failed: {e}"); + + // TODO: don't hard code timeout + event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); + continue 'governor; } + + tlog!(Info, "sharding is configured"); + continue 'governor; } @@ -1350,6 +1414,22 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { Ok(rx.into_iter().take(peer_count).collect()) } + + fn get_new_weights(peers: &[Peer], state: &State) -> Option<ReplicasetWeights> { + let replication_factor = state.replication_factor().expect("storage error"); + let mut replicaset_weights = state.replicaset_weights().expect("storage error"); + let mut replicaset_sizes = HashMap::new(); + let mut weights_changed = false; + for Peer { replicaset_id, .. } in peers { + let replicaset_size = replicaset_sizes.entry(replicaset_id.clone()).or_insert(0); + *replicaset_size += 1; + if *replicaset_size >= replication_factor && replicaset_weights[replicaset_id] == 0. { + weights_changed = true; + *replicaset_weights.get_mut(replicaset_id).unwrap() = 1.; + } + } + weights_changed.then_some(replicaset_weights) + } } fn conf_change_single(node_id: RaftId, is_voter: bool) -> raft::ConfChangeSingle { diff --git a/src/traft/rpc/sharding.rs b/src/traft/rpc/sharding.rs index 507467a4d7..aabcd75991 100644 --- a/src/traft/rpc/sharding.rs +++ b/src/traft/rpc/sharding.rs @@ -17,7 +17,11 @@ fn proc_sharding(req: Request) -> Result<Response, Error> { let _ = req.term; let storage = &node.storage; - let cfg = cfg::Cfg::from_storage(&storage)?; + let cfg = if let Some(weights) = req.weights { + cfg::Cfg::new(&storage.peers, weights)? + } else { + cfg::Cfg::from_storage(storage)? + }; let lua = ::tarantool::lua_state(); // TODO: fix user's permissions @@ -48,6 +52,7 @@ fn proc_sharding(req: Request) -> Result<Response, Error> { pub struct Request { pub leader_id: RaftId, pub term: RaftTerm, + pub weights: Option<cfg::ReplicasetWeights>, } impl ::tarantool::tuple::Encode for Request {} @@ -67,7 +72,7 @@ impl super::Request for Request { pub mod cfg { use crate::traft::error::Error; use crate::traft::storage::peer_field; - use crate::traft::storage::Storage; + use crate::traft::storage::{Peers, Storage}; use ::tarantool::tlua; @@ -118,12 +123,17 @@ pub mod cfg { pub type Weight = f64; impl Cfg { + #[inline] pub fn from_storage(storage: &Storage) -> Result<Self, Error> { + let replicaset_weights = storage.state.replicaset_weights()?; + Self::new(&storage.peers, replicaset_weights) + } + + pub fn new(peers: &Peers, replicaset_weights: ReplicasetWeights) -> Result<Self, Error> { use peer_field::{InstanceId, InstanceUuid, PeerAddress, ReplicasetUuid, ReplicasetId, IsMaster}; type Fields = (InstanceId, InstanceUuid, PeerAddress, ReplicasetUuid, ReplicasetId, IsMaster); - let replicaset_weights = storage.state.replicaset_weights()?; let mut sharding: HashMap<String, Replicaset> = HashMap::new(); - for (id, uuid, addr, rset, rset_id, is_master) in storage.peers.peers_fields::<Fields>()? { + for (id, uuid, addr, rset, rset_id, is_master) in peers.peers_fields::<Fields>()? { let replicaset = sharding.entry(rset).or_insert_with(|| Replicaset::with_weight(replicaset_weights.get(&rset_id).copied()) ); diff --git a/src/traft/storage.rs b/src/traft/storage.rs index 7854cf321c..c1db966409 100644 --- a/src/traft/storage.rs +++ b/src/traft/storage.rs @@ -193,6 +193,14 @@ impl State { pub fn vshard_bootstrapped(&self) -> tarantool::Result<bool> { Ok(self.get(StateKey::VshardBootstrapped)?.unwrap_or_default()) } + + #[inline] + pub fn replication_factor(&self) -> tarantool::Result<usize> { + let res = self + .get(StateKey::ReplicationFactor)? + .expect("replication_factor must be set at boot"); + Ok(res) + } } impl Clone for State { -- GitLab