Skip to content
Snippets Groups Projects
Commit f5835629 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon: Committed by Georgy Moshkin
Browse files

feat(sharding): vshard finilazing

parent 02e7ad7e
No related branches found
No related tags found
1 merge request!299Feat/poor mans vshard
...@@ -14,7 +14,6 @@ use ::tarantool::error::TransactionError; ...@@ -14,7 +14,6 @@ use ::tarantool::error::TransactionError;
use ::tarantool::fiber; use ::tarantool::fiber;
use ::tarantool::fiber::{Cond, Mutex}; use ::tarantool::fiber::{Cond, Mutex};
use ::tarantool::proc; use ::tarantool::proc;
use tarantool::space::UpdateOps;
use ::tarantool::tlua; use ::tarantool::tlua;
use ::tarantool::transaction::start_transaction; use ::tarantool::transaction::start_transaction;
use std::cell::Cell; use std::cell::Cell;
...@@ -24,17 +23,18 @@ use std::convert::TryFrom; ...@@ -24,17 +23,18 @@ use std::convert::TryFrom;
use std::rc::Rc; use std::rc::Rc;
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use tarantool::space::UpdateOps;
use crate::kvcell::KVCell; use crate::kvcell::KVCell;
use crate::r#loop::{FlowControl, Loop}; use crate::r#loop::{FlowControl, Loop};
use crate::stringify_cfunc; use crate::stringify_cfunc;
use crate::traft::storage::ClusterSpace;
use crate::traft::ContextCoercion as _; use crate::traft::ContextCoercion as _;
use crate::traft::InstanceId; use crate::traft::InstanceId;
use crate::traft::Peer; use crate::traft::Peer;
use crate::traft::RaftId; use crate::traft::RaftId;
use crate::traft::RaftIndex; use crate::traft::RaftIndex;
use crate::traft::RaftTerm; use crate::traft::RaftTerm;
use crate::traft::storage::ClusterSpace;
use crate::warn_or_panic; use crate::warn_or_panic;
use crate::{unwrap_ok_or, unwrap_some_or}; use crate::{unwrap_ok_or, unwrap_some_or};
use ::tarantool::util::IntoClones as _; use ::tarantool::util::IntoClones as _;
...@@ -48,9 +48,10 @@ use crate::traft::event; ...@@ -48,9 +48,10 @@ use crate::traft::event;
use crate::traft::event::Event; use crate::traft::event::Event;
use crate::traft::failover; use crate::traft::failover;
use crate::traft::notify::Notify; use crate::traft::notify::Notify;
use crate::traft::rpc::sharding::cfg::ReplicasetWeights;
use crate::traft::rpc::{replication, sharding}; use crate::traft::rpc::{replication, sharding};
use crate::traft::storage::peer_field; use crate::traft::storage::peer_field;
use crate::traft::storage::StateKey; use crate::traft::storage::{State, StateKey};
use crate::traft::ConnectionPool; use crate::traft::ConnectionPool;
use crate::traft::LogicalClock; use crate::traft::LogicalClock;
use crate::traft::Op; use crate::traft::Op;
...@@ -1048,6 +1049,8 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { ...@@ -1048,6 +1049,8 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
// raft sync // raft sync
// TODO: putting each stage in a different function
// will make the control flow more readable
let to_sync = peers let to_sync = peers
.iter() .iter()
.find(|peer| peer.has_grades(CurrentGrade::Offline, TargetGrade::Online)); .find(|peer| peer.has_grades(CurrentGrade::Offline, TargetGrade::Online));
...@@ -1101,12 +1104,16 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { ...@@ -1101,12 +1104,16 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
// replication // replication
// TODO: putting each stage in a different function
// will make error handling much easier
let to_replicate = peers let to_replicate = peers
.iter() .iter()
.find(|peer| peer.has_grades(CurrentGrade::RaftSynced, TargetGrade::Online)); .find(|peer| peer.has_grades(CurrentGrade::RaftSynced, TargetGrade::Online));
if let Some(peer) = to_replicate { if let Some(peer) = to_replicate {
let replicaset_id = &peer.replicaset_id; let replicaset_id = &peer.replicaset_id;
let replicaset_iids = unwrap_ok_or!( let replicaset_iids = unwrap_ok_or!(
// TODO: filter out Offline & Expelled peers
// TODO: use `peers` instead
storage.peers.replicaset_fields::<peer_field::InstanceId>(replicaset_id), storage.peers.replicaset_fields::<peer_field::InstanceId>(replicaset_id),
Err(e) => { Err(e) => {
tlog!(Warning, "failed reading replicaset instances: {e}"; tlog!(Warning, "failed reading replicaset instances: {e}";
...@@ -1201,8 +1208,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { ...@@ -1201,8 +1208,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
); );
// TODO: don't hard code timeout // TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)) event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
.unwrap();
continue 'governor; continue 'governor;
} }
} }
...@@ -1214,11 +1220,15 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { ...@@ -1214,11 +1220,15 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
// sharding // sharding
// TODO: putting each stage in a different function
// will make error handling much easier
let need_sharding = peers let need_sharding = peers
.iter() .iter()
.any(|peer| peer.has_grades(CurrentGrade::Replicated, TargetGrade::Online)); .any(|peer| peer.has_grades(CurrentGrade::Replicated, TargetGrade::Online));
if need_sharding { if need_sharding {
let peer_ids = unwrap_ok_or!( let peer_ids = unwrap_ok_or!(
// TODO: filter out Offline & Expelled peers
// TODO: use `peers` instead
storage.peers.peers_fields::<peer_field::InstanceId>(), storage.peers.peers_fields::<peer_field::InstanceId>(),
Err(e) => { Err(e) => {
tlog!(Warning, "failed reading peer instances: {e}"); tlog!(Warning, "failed reading peer instances: {e}");
...@@ -1236,13 +1246,14 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { ...@@ -1236,13 +1246,14 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
sharding::Request { sharding::Request {
leader_id: raft_id, leader_id: raft_id,
term, term,
weights: None,
}, },
// TODO: don't hard code timeout // TODO: don't hard code timeout
Duration::from_secs(3), Duration::from_secs(3),
); );
let res = unwrap_ok_or!(res, let res = unwrap_ok_or!(res,
Err(e) => { Err(e) => {
tlog!(Warning, "failed to configure sharding: {e}"); tlog!(Warning, "failed to initialize sharding: {e}");
continue 'governor; continue 'governor;
} }
); );
...@@ -1264,7 +1275,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { ...@@ -1264,7 +1275,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
); );
} }
Err(e) => { Err(e) => {
tlog!(Warning, "failed to configure sharding: {e}"; tlog!(Warning, "failed to initialize sharding: {e}";
"instance_id" => &*peer_iid, "instance_id" => &*peer_iid,
); );
...@@ -1276,29 +1287,82 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { ...@@ -1276,29 +1287,82 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
} }
} }
tlog!(Info, "configured sharding"); tlog!(Info, "sharding is initialized");
continue 'governor; continue 'governor;
} }
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
// ??? // sharding weights
// TODO let maybe_need_weights_update = peers
let to_online = peers
.iter() .iter()
.find(|peer| peer.has_grades(CurrentGrade::ShardingInitialized, TargetGrade::Online)); .any(|peer| peer.has_grades(CurrentGrade::ShardingInitialized, TargetGrade::Online));
if let Some(peer) = to_online { if maybe_need_weights_update {
let instance_id = peer.instance_id.clone(); let res = if let Some(new_weights) = get_new_weights(&peers, &storage.state) {
let cluster_id = storage.raft.cluster_id().unwrap().unwrap(); (|| -> Result<(), Error> {
let req = UpdatePeerRequest::new(instance_id, cluster_id) let res = call_all(
.with_current_grade(CurrentGrade::Online); &mut pool,
let res = node.handle_topology_request_and_wait(req.into()); peers.iter().map(|peer| peer.instance_id.clone()),
sharding::Request {
leader_id: raft_id,
term,
weights: Some(new_weights.clone()),
},
// TODO: don't hard code timeout
Duration::from_secs(3),
)?;
let cluster_id = storage.raft.cluster_id()?.unwrap();
for (peer_iid, resp) in res {
let cluster_id = cluster_id.clone();
let peer_iid_2 = peer_iid.clone();
resp.and_then(move |sharding::Response {}| {
let req = UpdatePeerRequest::new(peer_iid_2, cluster_id)
.with_current_grade(CurrentGrade::Online);
node.handle_topology_request_and_wait(req.into())
})?;
// TODO: change `Info` to `Debug`
tlog!(Info, "peer is online"; "instance_id" => &*peer_iid);
}
node.propose_and_wait(
// TODO: OpDML::update with just the changes
traft::OpDML::replace(
ClusterSpace::State,
&(StateKey::ReplicasetWeights, new_weights),
)?,
// TODO: don't hard code the timeout
Duration::from_secs(3),
)??;
Ok(())
})()
} else {
(|| -> Result<(), Error> {
let cluster_id = storage.raft.cluster_id()?.unwrap();
let to_online = peers.iter().filter(|peer| {
peer.has_grades(CurrentGrade::ShardingInitialized, TargetGrade::Online)
});
for Peer { instance_id, .. } in to_online {
let cluster_id = cluster_id.clone();
let req = UpdatePeerRequest::new(instance_id.clone(), cluster_id)
.with_current_grade(CurrentGrade::Online);
node.handle_topology_request_and_wait(req.into())?;
// TODO: change `Info` to `Debug`
tlog!(Info, "peer is online"; "instance_id" => &**instance_id);
}
Ok(())
})()
};
if let Err(e) = res { if let Err(e) = res {
tlog!(Warning, "failed to set peer online: {e}"; tlog!(Warning, "updating sharding weights failed: {e}");
"instance_id" => &*peer.instance_id,
); // TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
continue 'governor;
} }
tlog!(Info, "sharding is configured");
continue 'governor; continue 'governor;
} }
...@@ -1350,6 +1414,22 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { ...@@ -1350,6 +1414,22 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
Ok(rx.into_iter().take(peer_count).collect()) Ok(rx.into_iter().take(peer_count).collect())
} }
fn get_new_weights(peers: &[Peer], state: &State) -> Option<ReplicasetWeights> {
let replication_factor = state.replication_factor().expect("storage error");
let mut replicaset_weights = state.replicaset_weights().expect("storage error");
let mut replicaset_sizes = HashMap::new();
let mut weights_changed = false;
for Peer { replicaset_id, .. } in peers {
let replicaset_size = replicaset_sizes.entry(replicaset_id.clone()).or_insert(0);
*replicaset_size += 1;
if *replicaset_size >= replication_factor && replicaset_weights[replicaset_id] == 0. {
weights_changed = true;
*replicaset_weights.get_mut(replicaset_id).unwrap() = 1.;
}
}
weights_changed.then_some(replicaset_weights)
}
} }
fn conf_change_single(node_id: RaftId, is_voter: bool) -> raft::ConfChangeSingle { fn conf_change_single(node_id: RaftId, is_voter: bool) -> raft::ConfChangeSingle {
......
...@@ -17,7 +17,11 @@ fn proc_sharding(req: Request) -> Result<Response, Error> { ...@@ -17,7 +17,11 @@ fn proc_sharding(req: Request) -> Result<Response, Error> {
let _ = req.term; let _ = req.term;
let storage = &node.storage; let storage = &node.storage;
let cfg = cfg::Cfg::from_storage(&storage)?; let cfg = if let Some(weights) = req.weights {
cfg::Cfg::new(&storage.peers, weights)?
} else {
cfg::Cfg::from_storage(storage)?
};
let lua = ::tarantool::lua_state(); let lua = ::tarantool::lua_state();
// TODO: fix user's permissions // TODO: fix user's permissions
...@@ -48,6 +52,7 @@ fn proc_sharding(req: Request) -> Result<Response, Error> { ...@@ -48,6 +52,7 @@ fn proc_sharding(req: Request) -> Result<Response, Error> {
pub struct Request { pub struct Request {
pub leader_id: RaftId, pub leader_id: RaftId,
pub term: RaftTerm, pub term: RaftTerm,
pub weights: Option<cfg::ReplicasetWeights>,
} }
impl ::tarantool::tuple::Encode for Request {} impl ::tarantool::tuple::Encode for Request {}
...@@ -67,7 +72,7 @@ impl super::Request for Request { ...@@ -67,7 +72,7 @@ impl super::Request for Request {
pub mod cfg { pub mod cfg {
use crate::traft::error::Error; use crate::traft::error::Error;
use crate::traft::storage::peer_field; use crate::traft::storage::peer_field;
use crate::traft::storage::Storage; use crate::traft::storage::{Peers, Storage};
use ::tarantool::tlua; use ::tarantool::tlua;
...@@ -118,12 +123,17 @@ pub mod cfg { ...@@ -118,12 +123,17 @@ pub mod cfg {
pub type Weight = f64; pub type Weight = f64;
impl Cfg { impl Cfg {
#[inline]
pub fn from_storage(storage: &Storage) -> Result<Self, Error> { pub fn from_storage(storage: &Storage) -> Result<Self, Error> {
let replicaset_weights = storage.state.replicaset_weights()?;
Self::new(&storage.peers, replicaset_weights)
}
pub fn new(peers: &Peers, replicaset_weights: ReplicasetWeights) -> Result<Self, Error> {
use peer_field::{InstanceId, InstanceUuid, PeerAddress, ReplicasetUuid, ReplicasetId, IsMaster}; use peer_field::{InstanceId, InstanceUuid, PeerAddress, ReplicasetUuid, ReplicasetId, IsMaster};
type Fields = (InstanceId, InstanceUuid, PeerAddress, ReplicasetUuid, ReplicasetId, IsMaster); type Fields = (InstanceId, InstanceUuid, PeerAddress, ReplicasetUuid, ReplicasetId, IsMaster);
let replicaset_weights = storage.state.replicaset_weights()?;
let mut sharding: HashMap<String, Replicaset> = HashMap::new(); let mut sharding: HashMap<String, Replicaset> = HashMap::new();
for (id, uuid, addr, rset, rset_id, is_master) in storage.peers.peers_fields::<Fields>()? { for (id, uuid, addr, rset, rset_id, is_master) in peers.peers_fields::<Fields>()? {
let replicaset = sharding.entry(rset).or_insert_with(|| let replicaset = sharding.entry(rset).or_insert_with(||
Replicaset::with_weight(replicaset_weights.get(&rset_id).copied()) Replicaset::with_weight(replicaset_weights.get(&rset_id).copied())
); );
......
...@@ -193,6 +193,14 @@ impl State { ...@@ -193,6 +193,14 @@ impl State {
pub fn vshard_bootstrapped(&self) -> tarantool::Result<bool> { pub fn vshard_bootstrapped(&self) -> tarantool::Result<bool> {
Ok(self.get(StateKey::VshardBootstrapped)?.unwrap_or_default()) Ok(self.get(StateKey::VshardBootstrapped)?.unwrap_or_default())
} }
#[inline]
pub fn replication_factor(&self) -> tarantool::Result<usize> {
let res = self
.get(StateKey::ReplicationFactor)?
.expect("replication_factor must be set at boot");
Ok(res)
}
} }
impl Clone for State { impl Clone for State {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment