From 5040442178aa3cee97f974985555f74e2c7a86b2 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Tue, 6 Dec 2022 09:48:08 +0300 Subject: [PATCH] refactor(governor): traft::node::raft_conf_change_loop -> governor::governor_loop --- src/governor/mod.rs | 720 ++++++++++++++++++++++++++++++++++++- src/traft/mod.rs | 4 +- src/traft/node.rs | 724 +------------------------------------- src/traft/raft_storage.rs | 4 +- 4 files changed, 732 insertions(+), 720 deletions(-) diff --git a/src/governor/mod.rs b/src/governor/mod.rs index f32005da51..3d525509d3 100644 --- a/src/governor/mod.rs +++ b/src/governor/mod.rs @@ -1,7 +1,725 @@ +use std::borrow::Cow; +use std::cell::Cell; +use std::collections::{HashMap, HashSet}; +use std::iter::repeat; +use std::rc::Rc; +use std::time::Duration; + +use ::tarantool::fiber; +use ::tarantool::space::UpdateOps; +use ::tarantool::util::IntoClones as _; + +use crate::event::{self, Event}; +use crate::storage::{Clusterwide, ClusterwideSpace, PropertyName}; +use crate::tlog; +use crate::traft::error::Error; +use crate::traft::network::{ConnectionPool, IdOfInstance}; +use crate::traft::node::global; +use crate::traft::node::Status; +use crate::traft::raft_storage::RaftSpaceAccess; +use crate::traft::rpc; +use crate::traft::rpc::sharding::cfg::ReplicasetWeights; +use crate::traft::rpc::{replication, sharding, sync, update_instance}; +use crate::traft::OpDML; +use crate::traft::Result; +use crate::traft::{CurrentGrade, CurrentGradeVariant, TargetGradeVariant}; +use crate::traft::{Instance, Replicaset}; + pub(crate) mod cc; pub(crate) mod migration; pub(crate) use cc::raft_conf_change; pub(crate) use migration::waiting_migrations; -// TODO place governor code here +pub(crate) fn governor_loop( + status: Rc<Cell<Status>>, + storage: Clusterwide, + raft_storage: RaftSpaceAccess, +) { + let mut pool = ConnectionPool::builder(storage.clone()) + .call_timeout(Duration::from_secs(1)) + .connect_timeout(Duration::from_millis(500)) + .inactivity_timeout(Duration::from_secs(60)) + .build(); + + // TODO: don't hardcode this + const SYNC_TIMEOUT: Duration = Duration::from_secs(10); + + 'governor: loop { + if !status.get().raft_state.is_leader() { + event::wait(Event::StatusChanged).expect("Events system must be initialized"); + continue 'governor; + } + + let instances = storage.instances.all_instances().unwrap(); + let term = status.get().term; + let cluster_id = raft_storage.cluster_id().unwrap().unwrap(); + let node = global().expect("must be initialized"); + + //////////////////////////////////////////////////////////////////////// + // conf change + let voters = raft_storage.voters().unwrap().unwrap_or_default(); + let learners = raft_storage.learners().unwrap().unwrap_or_default(); + if let Some(conf_change) = raft_conf_change(&instances, &voters, &learners) { + // main_loop gives the warranty that every ProposeConfChange + // will sometimes be handled and there's no need in timeout. + // It also guarantees that the notification will arrive only + // after the node leaves the joint state. + tlog!(Info, "proposing conf_change"; "cc" => ?conf_change); + if let Err(e) = node.propose_conf_change_and_wait(term, conf_change) { + tlog!(Warning, "failed proposing conf_change: {e}"); + fiber::sleep(Duration::from_secs(1)); + } + continue 'governor; + } + + //////////////////////////////////////////////////////////////////////// + // offline/expel + let to_offline = instances + .iter() + .filter(|instance| instance.current_grade != CurrentGradeVariant::Offline) + // TODO: process them all, not just the first one + .find(|instance| { + let (target, current) = ( + instance.target_grade.variant, + instance.current_grade.variant, + ); + matches!(target, TargetGradeVariant::Offline) + || !matches!(current, CurrentGradeVariant::Expelled) + && matches!(target, TargetGradeVariant::Expelled) + }); + if let Some(instance) = to_offline { + tlog!( + Info, + "processing {} {} -> {}", + instance.instance_id, + instance.current_grade, + instance.target_grade + ); + + // transfer leadership, if we're the one who goes offline + if instance.raft_id == node.raft_id { + if let Some(new_leader) = maybe_responding(&instances).find(|instance| { + // FIXME: linear search + voters.contains(&instance.raft_id) + }) { + tlog!( + Info, + "transferring leadership to {}", + new_leader.instance_id + ); + node.transfer_leadership_and_yield(new_leader.raft_id); + event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); + continue 'governor; + } + } + + let replicaset_id = &instance.replicaset_id; + // choose a new replicaset master if needed + let res = (|| -> Result<_> { + let replicaset = storage.replicasets.get(replicaset_id)?; + if replicaset + .map(|r| r.master_id == instance.instance_id) + .unwrap_or(false) + { + let new_master = + maybe_responding(&instances).find(|p| p.replicaset_id == replicaset_id); + if let Some(instance) = new_master { + let mut ops = UpdateOps::new(); + ops.assign("master_id", &instance.instance_id)?; + + let op = + OpDML::update(ClusterwideSpace::Replicaset, &[replicaset_id], ops)?; + tlog!(Info, "proposing replicaset master change"; "op" => ?op); + // TODO: don't hard code the timeout + node.propose_and_wait(op, Duration::from_secs(3))??; + } else { + tlog!(Info, "skip proposing replicaset master change"); + } + } + Ok(()) + })(); + if let Err(e) = res { + tlog!(Warning, "failed proposing replicaset master change: {e}"); + // TODO: don't hard code timeout + event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); + continue 'governor; + } + + // reconfigure vshard storages and routers + let res = (|| -> Result<_> { + let commit = raft_storage.commit()?.unwrap(); + let reqs = maybe_responding(&instances) + .filter(|instance| { + instance.current_grade == CurrentGradeVariant::ShardingInitialized + || instance.current_grade == CurrentGradeVariant::Online + }) + .map(|instance| { + tlog!(Info, + "calling rpc::sharding"; + "instance_id" => %instance.instance_id + ); + ( + instance.instance_id.clone(), + sharding::Request { + term, + commit, + timeout: SYNC_TIMEOUT, + bootstrap: false, + }, + ) + }); + // TODO: don't hard code timeout + let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; + for (_, resp) in res { + let sharding::Response {} = resp?; + } + Ok(()) + })(); + if let Err(e) = res { + tlog!(Warning, "failed calling rpc::sharding: {e}"); + // TODO: don't hard code timeout + event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); + continue 'governor; + } + + // update instance's CurrentGrade + let req = + update_instance::Request::new(instance.instance_id.clone(), cluster_id.clone()) + .with_current_grade(instance.target_grade.into()); + tlog!(Info, + "handling update_instance::Request"; + "current_grade" => %req.current_grade.expect("just set"), + "instance_id" => %req.instance_id, + ); + if let Err(e) = node.handle_update_instance_request_and_wait(req) { + tlog!(Warning, + "failed handling update_instance::Request: {e}"; + "instance_id" => %instance.instance_id, + ); + // TODO: don't hard code timeout + event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); + continue 'governor; + } + + let replicaset_instances = storage + .instances + .replicaset_instances(replicaset_id) + .expect("storage error") + .filter(|instance| !instance.is_expelled()) + .collect::<Vec<_>>(); + let may_respond = replicaset_instances + .iter() + .filter(|instance| instance.may_respond()); + // Check if it makes sense to call box.ctl.promote, + // otherwise we risk unpredictable delays + if replicaset_instances.len() / 2 + 1 > may_respond.count() { + tlog!(Warning, + "replicaset lost quorum"; + "replicaset_id" => %replicaset_id, + ); + continue 'governor; + } + + let res = (|| -> Result<_> { + // Promote the replication leader again + // because of tarantool bugs + if let Some(replicaset) = storage.replicasets.get(replicaset_id)? { + tlog!(Info, + "calling rpc::replication::promote"; + "instance_id" => %replicaset.master_id + ); + let commit = raft_storage.commit()?.unwrap(); + pool.call_and_wait_timeout( + &replicaset.master_id, + replication::promote::Request { + term, + commit, + timeout: SYNC_TIMEOUT, + }, + // TODO: don't hard code timeout + Duration::from_secs(3), + )?; + } + Ok(()) + })(); + if let Err(e) = res { + tlog!(Warning, + "failed calling rpc::replication::promote: {e}"; + "replicaset_id" => %replicaset_id, + ); + } + + continue 'governor; + } + + //////////////////////////////////////////////////////////////////////// + // raft sync + // TODO: putting each stage in a different function + // will make the control flow more readable + let to_sync = instances.iter().find(|instance| { + instance.has_grades(CurrentGradeVariant::Offline, TargetGradeVariant::Online) + || instance.is_reincarnated() + }); + if let Some(instance) = to_sync { + let (rx, tx) = fiber::Channel::new(1).into_clones(); + let commit = raft_storage.commit().unwrap().unwrap(); + pool.call( + &instance.raft_id, + sync::Request { + commit, + timeout: SYNC_TIMEOUT, + }, + move |res| tx.send(res).expect("mustn't fail"), + ) + .expect("shouldn't fail"); + let res = rx.recv().expect("ought not fail"); + let res = res.and_then(|sync::Response { commit }| { + // TODO: change `Info` to `Debug` + tlog!(Info, "instance synced"; + "commit" => commit, + "instance_id" => &*instance.instance_id, + ); + + let req = update_instance::Request::new(instance.instance_id.clone(), cluster_id) + .with_current_grade(CurrentGrade::raft_synced( + instance.target_grade.incarnation, + )); + global() + .expect("can't be deinitialized") + .handle_update_instance_request_and_wait(req) + }); + match res { + Ok(()) => { + tlog!(Info, "raft sync processed"); + } + Err(e) => { + tlog!(Warning, "raft sync failed: {e}"; + "instance_id" => %instance.instance_id, + ); + + // TODO: don't hard code timeout + event::wait_timeout(Event::TopologyChanged, Duration::from_millis(300)) + .unwrap(); + } + } + + continue 'governor; + } + + //////////////////////////////////////////////////////////////////////// + // replication + let to_replicate = instances + .iter() + // TODO: find all such instances in a given replicaset, + // not just the first one + .find(|instance| { + instance.has_grades(CurrentGradeVariant::RaftSynced, TargetGradeVariant::Online) + }); + if let Some(instance) = to_replicate { + let replicaset_id = &instance.replicaset_id; + let replicaset_iids = maybe_responding(&instances) + .filter(|instance| instance.replicaset_id == replicaset_id) + .map(|instance| instance.instance_id.clone()) + .collect::<Vec<_>>(); + + let res = (|| -> Result<_> { + let commit = raft_storage.commit()?.unwrap(); + let reqs = replicaset_iids + .iter() + .cloned() + .zip(repeat(replication::Request { + term, + commit, + timeout: SYNC_TIMEOUT, + replicaset_instances: replicaset_iids.clone(), + replicaset_id: replicaset_id.clone(), + })); + // TODO: don't hard code timeout + let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; + + for (instance_id, resp) in res { + let replication::Response { lsn } = resp?; + // TODO: change `Info` to `Debug` + tlog!(Info, "configured replication with instance"; + "instance_id" => %instance_id, + "lsn" => lsn, + ); + } + + let req = update_instance::Request::new(instance.instance_id.clone(), cluster_id) + .with_current_grade(CurrentGrade::replicated( + instance.target_grade.incarnation, + )); + node.handle_update_instance_request_and_wait(req)?; + + Ok(()) + })(); + if let Err(e) = res { + tlog!(Warning, "failed to configure replication: {e}"); + // TODO: don't hard code timeout + event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); + continue 'governor; + } + + let res = (|| -> Result<_> { + let master_id = + if let Some(replicaset) = storage.replicasets.get(&instance.replicaset_id)? { + Cow::Owned(replicaset.master_id) + } else { + let vshard_bootstrapped = storage.state.vshard_bootstrapped()?; + let req = OpDML::insert( + ClusterwideSpace::Replicaset, + &Replicaset { + replicaset_id: instance.replicaset_id.clone(), + replicaset_uuid: instance.replicaset_uuid.clone(), + master_id: instance.instance_id.clone(), + weight: if vshard_bootstrapped { 0. } else { 1. }, + current_schema_version: 0, + }, + )?; + // TODO: don't hard code the timeout + node.propose_and_wait(req, Duration::from_secs(3))??; + Cow::Borrowed(&instance.instance_id) + }; + + let commit = raft_storage.commit()?.unwrap(); + pool.call_and_wait_timeout( + &*master_id, + replication::promote::Request { + term, + commit, + timeout: SYNC_TIMEOUT, + }, + // TODO: don't hard code timeout + Duration::from_secs(3), + )?; + tlog!(Debug, "promoted replicaset master"; + "instance_id" => %master_id, + "replicaset_id" => %instance.replicaset_id, + ); + Ok(()) + })(); + if let Err(e) = res { + tlog!(Warning, "failed to promote replicaset master: {e}"; + "replicaset_id" => %replicaset_id, + ); + } + + tlog!(Info, "configured replication"; "replicaset_id" => %replicaset_id); + + continue 'governor; + } + + //////////////////////////////////////////////////////////////////////// + // init sharding + let to_shard = instances.iter().find(|instance| { + instance.has_grades(CurrentGradeVariant::Replicated, TargetGradeVariant::Online) + }); + if let Some(instance) = to_shard { + let res = (|| -> Result<()> { + let vshard_bootstrapped = storage.state.vshard_bootstrapped()?; + let commit = raft_storage.commit()?.unwrap(); + let reqs = maybe_responding(&instances).map(|instance| { + ( + instance.instance_id.clone(), + sharding::Request { + term, + commit, + timeout: SYNC_TIMEOUT, + bootstrap: !vshard_bootstrapped && instance.raft_id == node.raft_id, + }, + ) + }); + // TODO: don't hard code timeout + let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; + + for (instance_id, resp) in res { + let sharding::Response {} = resp?; + + // TODO: change `Info` to `Debug` + tlog!(Info, "initialized sharding with instance"; + "instance_id" => %instance_id, + ); + } + + let req = update_instance::Request::new(instance.instance_id.clone(), cluster_id) + .with_current_grade(CurrentGrade::sharding_initialized( + instance.target_grade.incarnation, + )); + node.handle_update_instance_request_and_wait(req)?; + + if !vshard_bootstrapped { + // TODO: if this fails, it will only rerun next time vshard + // gets reconfigured + node.propose_and_wait( + OpDML::replace( + ClusterwideSpace::Property, + &(PropertyName::VshardBootstrapped, true), + )?, + // TODO: don't hard code the timeout + Duration::from_secs(3), + )??; + } + + Ok(()) + })(); + if let Err(e) = res { + tlog!(Warning, "failed to initialize sharding: {e}"); + // TODO: don't hard code timeout + event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); + continue 'governor; + } + + let res = (|| -> Result<()> { + // Promote the replication leaders again + // because of tarantool bugs + let replicasets = storage.replicasets.iter()?; + let masters = replicasets.map(|r| r.master_id).collect::<HashSet<_>>(); + let commit = raft_storage.commit()?.unwrap(); + let reqs = maybe_responding(&instances) + .filter(|instance| masters.contains(&instance.instance_id)) + .map(|instance| instance.instance_id.clone()) + .zip(repeat(replication::promote::Request { + term, + commit, + timeout: SYNC_TIMEOUT, + })); + // TODO: don't hard code timeout + let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; + for (instance_id, resp) in res { + resp?; + tlog!(Debug, "promoted replicaset master"; "instance_id" => %instance_id); + } + Ok(()) + })(); + if let Err(e) = res { + tlog!(Warning, "failed to promote replicaset masters: {e}"); + } + + tlog!(Info, "sharding is initialized"); + + continue 'governor; + } + + //////////////////////////////////////////////////////////////////////// + // sharding weights + let to_update_weights = instances.iter().find(|instance| { + instance.has_grades( + CurrentGradeVariant::ShardingInitialized, + TargetGradeVariant::Online, + ) + }); + if let Some(instance) = to_update_weights { + let res = if let Some(added_weights) = + get_weight_changes(maybe_responding(&instances), &storage) + { + (|| -> Result<()> { + for (replicaset_id, weight) in added_weights { + let mut ops = UpdateOps::new(); + ops.assign("weight", weight)?; + node.propose_and_wait( + OpDML::update(ClusterwideSpace::Replicaset, &[replicaset_id], ops)?, + // TODO: don't hard code the timeout + Duration::from_secs(3), + )??; + } + + let instance_ids = + maybe_responding(&instances).map(|instance| instance.instance_id.clone()); + let commit = raft_storage.commit()?.unwrap(); + let reqs = instance_ids.zip(repeat(sharding::Request { + term, + commit, + timeout: SYNC_TIMEOUT, + bootstrap: false, + })); + // TODO: don't hard code timeout + let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; + + for (instance_id, resp) in res { + resp?; + // TODO: change `Info` to `Debug` + tlog!(Info, "instance is online"; "instance_id" => %instance_id); + } + + let req = + update_instance::Request::new(instance.instance_id.clone(), cluster_id) + .with_current_grade(CurrentGrade::online( + instance.target_grade.incarnation, + )); + node.handle_update_instance_request_and_wait(req)?; + Ok(()) + })() + } else { + (|| -> Result<()> { + let to_online = instances.iter().filter(|instance| { + instance.has_grades( + CurrentGradeVariant::ShardingInitialized, + TargetGradeVariant::Online, + ) + }); + for Instance { + instance_id, + target_grade, + .. + } in to_online + { + let cluster_id = cluster_id.clone(); + let req = update_instance::Request::new(instance_id.clone(), cluster_id) + .with_current_grade(CurrentGrade::online(target_grade.incarnation)); + node.handle_update_instance_request_and_wait(req)?; + // TODO: change `Info` to `Debug` + tlog!(Info, "instance is online"; "instance_id" => %instance_id); + } + Ok(()) + })() + }; + if let Err(e) = res { + tlog!(Warning, "updating sharding weights failed: {e}"); + + // TODO: don't hard code timeout + event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); + continue 'governor; + } + + tlog!(Info, "sharding is configured"); + + continue 'governor; + } + + //////////////////////////////////////////////////////////////////////// + // applying migrations + let desired_schema_version = storage.state.desired_schema_version().unwrap(); + let replicasets = storage.replicasets.iter().unwrap().collect::<Vec<_>>(); + let mut migrations = storage.migrations.iter().unwrap().collect::<Vec<_>>(); + let commit = raft_storage.commit().unwrap().unwrap(); + for (mid, rids) in waiting_migrations(&mut migrations, &replicasets, desired_schema_version) + { + let migration = storage.migrations.get(mid).unwrap().unwrap(); + for rid in rids { + let replicaset = storage + .replicasets + .get(rid.to_string().as_str()) + .unwrap() + .unwrap(); + let instance = storage.instances.get(&replicaset.master_id).unwrap(); + let req = rpc::migration::apply::Request { + term, + commit, + timeout: SYNC_TIMEOUT, + migration_id: migration.id, + }; + let res = pool.call_and_wait(&instance.raft_id, req); + match res { + Ok(_) => { + let mut ops = UpdateOps::new(); + ops.assign("current_schema_version", migration.id).unwrap(); + let op = OpDML::update( + ClusterwideSpace::Replicaset, + &[replicaset.replicaset_id.clone()], + ops, + ) + .unwrap(); + node.propose_and_wait(op, Duration::MAX).unwrap().unwrap(); + tlog!( + Info, + "Migration {0} applied to replicaset {1}", + migration.id, + replicaset.replicaset_id + ); + } + Err(e) => { + tlog!( + Warning, + "Could not apply migration {0} to replicaset {1}, error: {2}", + migration.id, + replicaset.replicaset_id, + e + ); + continue 'governor; + } + } + } + } + event::broadcast(Event::MigrateDone); + + event::wait_any(&[Event::TopologyChanged, Event::ClusterStateChanged]) + .expect("Events system must be initialized"); + } +} + +#[allow(clippy::type_complexity)] +fn call_all<R, I>( + pool: &mut ConnectionPool, + reqs: impl IntoIterator<Item = (I, R)>, + timeout: Duration, +) -> Result<Vec<(I, Result<R::Response>)>> +where + R: rpc::Request, + I: IdOfInstance + 'static, +{ + // TODO: this crap is only needed to wait until results of all + // the calls are ready. There are several ways to rafactor this: + // - we could use a std-style channel that unblocks the reading end + // once all the writing ends have dropped + // (fiber::Channel cannot do that for now) + // - using the std Futures we could use futures::join! + // + // Those things aren't implemented yet, so this is what we do + let reqs = reqs.into_iter().collect::<Vec<_>>(); + if reqs.is_empty() { + return Ok(vec![]); + } + static mut SENT_COUNT: usize = 0; + unsafe { SENT_COUNT = 0 }; + let (cond_rx, cond_tx) = Rc::new(fiber::Cond::new()).into_clones(); + let instance_count = reqs.len(); + let (rx, tx) = fiber::Channel::new(instance_count as _).into_clones(); + for (id, req) in reqs { + let tx = tx.clone(); + let cond_tx = cond_tx.clone(); + let id_copy = id.clone(); + pool.call(&id, req, move |res| { + tx.send((id_copy, res)).expect("mustn't fail"); + unsafe { SENT_COUNT += 1 }; + if unsafe { SENT_COUNT } == instance_count { + cond_tx.signal() + } + }) + .expect("shouldn't fail"); + } + // TODO: don't hard code timeout + if !cond_rx.wait_timeout(timeout) { + return Err(Error::Timeout); + } + + Ok(rx.into_iter().take(instance_count).collect()) +} + +#[inline(always)] +fn get_weight_changes<'p>( + instances: impl IntoIterator<Item = &'p Instance>, + storage: &Clusterwide, +) -> Option<ReplicasetWeights> { + let replication_factor = storage.state.replication_factor().expect("storage error"); + let replicaset_weights = storage.replicasets.weights().expect("storage error"); + let mut replicaset_sizes = HashMap::new(); + let mut weight_changes = HashMap::new(); + for instance @ Instance { replicaset_id, .. } in instances { + if !instance.may_respond() { + continue; + } + let replicaset_size = replicaset_sizes.entry(replicaset_id.clone()).or_insert(0); + *replicaset_size += 1; + if *replicaset_size >= replication_factor && replicaset_weights[replicaset_id] == 0. { + weight_changes.entry(replicaset_id.clone()).or_insert(1.); + } + } + (!weight_changes.is_empty()).then_some(weight_changes) +} + +#[inline(always)] +fn maybe_responding(instances: &[Instance]) -> impl Iterator<Item = &Instance> { + instances.iter().filter(|instance| instance.may_respond()) +} diff --git a/src/traft/mod.rs b/src/traft/mod.rs index 4369e00399..8dfbd7635a 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -2,10 +2,10 @@ pub mod error; pub mod event; -mod network; +pub(crate) mod network; pub mod node; pub mod notify; -mod raft_storage; +pub(crate) mod raft_storage; pub mod rpc; pub mod topology; diff --git a/src/traft/node.rs b/src/traft/node.rs index 32d1331f01..78b4913d95 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -16,27 +16,21 @@ use ::tarantool::fiber::r#async::block_on; use ::tarantool::fiber::r#async::oneshot; use ::tarantool::fiber::{Cond, Mutex}; use ::tarantool::proc; -use ::tarantool::space::UpdateOps; use ::tarantool::tlua; use ::tarantool::transaction::start_transaction; -use std::borrow::Cow; use std::cell::Cell; use std::collections::HashMap; -use std::collections::HashSet; use std::convert::TryFrom; -use std::iter::repeat; use std::rc::Rc; use std::time::Duration; use std::time::Instant; -use crate::governor::raft_conf_change; -use crate::governor::waiting_migrations; +use crate::governor::governor_loop; use crate::kvcell::KVCell; use crate::loop_start; use crate::r#loop::FlowControl; use crate::storage::{Clusterwide, ClusterwideSpace, PropertyName}; use crate::stringify_cfunc; -use crate::traft::rpc; use crate::traft::ContextCoercion as _; use crate::traft::Instance; use crate::traft::RaftId; @@ -45,7 +39,6 @@ use crate::traft::RaftTerm; use crate::traft::{OpDML, OpPersistInstance}; use crate::unwrap_some_or; use crate::warn_or_panic; -use ::tarantool::util::IntoClones as _; use protobuf::Message as _; use crate::tlog; @@ -54,17 +47,15 @@ use crate::traft::error::Error; use crate::traft::event; use crate::traft::event::Event; use crate::traft::notify::{notification, Notifier, Notify}; -use crate::traft::rpc::sharding::cfg::ReplicasetWeights; -use crate::traft::rpc::{join, replication, sharding, sync, update_instance}; +use crate::traft::rpc::{join, update_instance}; use crate::traft::ConnectionPool; +use crate::traft::CurrentGradeVariant; use crate::traft::LogicalClock; use crate::traft::Op; +use crate::traft::OpResult; use crate::traft::RaftSpaceAccess; use crate::traft::Topology; -use super::OpResult; -use super::{CurrentGrade, CurrentGradeVariant, TargetGradeVariant}; - type RawNode = raft::RawNode<RaftSpaceAccess>; ::tarantool::define_str_enum! { @@ -125,7 +116,7 @@ pub struct Node { // This is a concious decision. // `self.raft_id()` is used in Rust API, and // `self.status()` is mostly useful in Lua API. - raft_id: RaftId, + pub(crate) raft_id: RaftId, node_impl: Rc<Mutex<NodeImpl>>, pub(crate) storage: Clusterwide, @@ -159,11 +150,11 @@ impl Node { let node_impl = Rc::new(Mutex::new(node_impl)); - let conf_change_loop_fn = { + let governor_loop_fn = { let status = status.clone(); let storage = storage.clone(); let raft_storage = raft_storage.clone(); - move || raft_conf_change_loop(status, storage, raft_storage) + move || governor_loop(status, storage, raft_storage) }; let node = Node { @@ -171,7 +162,7 @@ impl Node { main_loop: MainLoop::start(status.clone(), node_impl.clone()), // yields _conf_change_loop: fiber::Builder::new() .name("governor_loop") - .proc(conf_change_loop_fn) + .proc(governor_loop_fn) .start() .unwrap(), node_impl, @@ -300,7 +291,7 @@ impl Node { /// Only the conf_change_loop on a leader is eligible to call this function. /// /// **This function yields** - fn propose_conf_change_and_wait( + pub(crate) fn propose_conf_change_and_wait( &self, term: RaftTerm, conf_change: raft::ConfChangeV2, @@ -980,703 +971,6 @@ impl Drop for MainLoop { } } -fn raft_conf_change_loop( - status: Rc<Cell<Status>>, - storage: Clusterwide, - raft_storage: RaftSpaceAccess, -) { - let mut pool = ConnectionPool::builder(storage.clone()) - .call_timeout(Duration::from_secs(1)) - .connect_timeout(Duration::from_millis(500)) - .inactivity_timeout(Duration::from_secs(60)) - .build(); - - // TODO: don't hardcode this - const SYNC_TIMEOUT: Duration = Duration::from_secs(10); - - 'governor: loop { - if !status.get().raft_state.is_leader() { - event::wait(Event::StatusChanged).expect("Events system must be initialized"); - continue 'governor; - } - - let instances = storage.instances.all_instances().unwrap(); - let term = status.get().term; - let cluster_id = raft_storage.cluster_id().unwrap().unwrap(); - let node = global().expect("must be initialized"); - - //////////////////////////////////////////////////////////////////////// - // conf change - let voters = raft_storage.voters().unwrap().unwrap_or_default(); - let learners = raft_storage.learners().unwrap().unwrap_or_default(); - if let Some(conf_change) = raft_conf_change(&instances, &voters, &learners) { - // main_loop gives the warranty that every ProposeConfChange - // will sometimes be handled and there's no need in timeout. - // It also guarantees that the notification will arrive only - // after the node leaves the joint state. - tlog!(Info, "proposing conf_change"; "cc" => ?conf_change); - if let Err(e) = node.propose_conf_change_and_wait(term, conf_change) { - tlog!(Warning, "failed proposing conf_change: {e}"); - fiber::sleep(Duration::from_secs(1)); - } - continue 'governor; - } - - //////////////////////////////////////////////////////////////////////// - // offline/expel - let to_offline = instances - .iter() - .filter(|instance| instance.current_grade != CurrentGradeVariant::Offline) - // TODO: process them all, not just the first one - .find(|instance| { - let (target, current) = ( - instance.target_grade.variant, - instance.current_grade.variant, - ); - matches!(target, TargetGradeVariant::Offline) - || !matches!(current, CurrentGradeVariant::Expelled) - && matches!(target, TargetGradeVariant::Expelled) - }); - if let Some(instance) = to_offline { - tlog!( - Info, - "processing {} {} -> {}", - instance.instance_id, - instance.current_grade, - instance.target_grade - ); - - // transfer leadership, if we're the one who goes offline - if instance.raft_id == node.raft_id { - if let Some(new_leader) = maybe_responding(&instances).find(|instance| { - // FIXME: linear search - voters.contains(&instance.raft_id) - }) { - tlog!( - Info, - "transferring leadership to {}", - new_leader.instance_id - ); - node.transfer_leadership_and_yield(new_leader.raft_id); - event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); - continue 'governor; - } - } - - let replicaset_id = &instance.replicaset_id; - // choose a new replicaset master if needed - let res = (|| -> traft::Result<_> { - let replicaset = storage.replicasets.get(replicaset_id)?; - if replicaset - .map(|r| r.master_id == instance.instance_id) - .unwrap_or(false) - { - let new_master = - maybe_responding(&instances).find(|p| p.replicaset_id == replicaset_id); - if let Some(instance) = new_master { - let mut ops = UpdateOps::new(); - ops.assign("master_id", &instance.instance_id)?; - - let op = - OpDML::update(ClusterwideSpace::Replicaset, &[replicaset_id], ops)?; - tlog!(Info, "proposing replicaset master change"; "op" => ?op); - // TODO: don't hard code the timeout - node.propose_and_wait(op, Duration::from_secs(3))??; - } else { - tlog!(Info, "skip proposing replicaset master change"); - } - } - Ok(()) - })(); - if let Err(e) = res { - tlog!(Warning, "failed proposing replicaset master change: {e}"); - // TODO: don't hard code timeout - event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); - continue 'governor; - } - - // reconfigure vshard storages and routers - let res = (|| -> traft::Result<_> { - let commit = raft_storage.commit()?.unwrap(); - let reqs = maybe_responding(&instances) - .filter(|instance| { - instance.current_grade == CurrentGradeVariant::ShardingInitialized - || instance.current_grade == CurrentGradeVariant::Online - }) - .map(|instance| { - tlog!(Info, - "calling rpc::sharding"; - "instance_id" => %instance.instance_id - ); - ( - instance.instance_id.clone(), - sharding::Request { - term, - commit, - timeout: SYNC_TIMEOUT, - bootstrap: false, - }, - ) - }); - // TODO: don't hard code timeout - let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; - for (_, resp) in res { - let sharding::Response {} = resp?; - } - Ok(()) - })(); - if let Err(e) = res { - tlog!(Warning, "failed calling rpc::sharding: {e}"); - // TODO: don't hard code timeout - event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); - continue 'governor; - } - - // update instance's CurrentGrade - let req = - update_instance::Request::new(instance.instance_id.clone(), cluster_id.clone()) - .with_current_grade(instance.target_grade.into()); - tlog!(Info, - "handling update_instance::Request"; - "current_grade" => %req.current_grade.expect("just set"), - "instance_id" => %req.instance_id, - ); - if let Err(e) = node.handle_update_instance_request_and_wait(req) { - tlog!(Warning, - "failed handling update_instance::Request: {e}"; - "instance_id" => %instance.instance_id, - ); - // TODO: don't hard code timeout - event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); - continue 'governor; - } - - let replicaset_instances = storage - .instances - .replicaset_instances(replicaset_id) - .expect("storage error") - .filter(|instance| !instance.is_expelled()) - .collect::<Vec<_>>(); - let may_respond = replicaset_instances - .iter() - .filter(|instance| instance.may_respond()); - // Check if it makes sense to call box.ctl.promote, - // otherwise we risk unpredictable delays - if replicaset_instances.len() / 2 + 1 > may_respond.count() { - tlog!(Warning, - "replicaset lost quorum"; - "replicaset_id" => %replicaset_id, - ); - continue 'governor; - } - - let res = (|| -> traft::Result<_> { - // Promote the replication leader again - // because of tarantool bugs - if let Some(replicaset) = storage.replicasets.get(replicaset_id)? { - tlog!(Info, - "calling rpc::replication::promote"; - "instance_id" => %replicaset.master_id - ); - let commit = raft_storage.commit()?.unwrap(); - pool.call_and_wait_timeout( - &replicaset.master_id, - replication::promote::Request { - term, - commit, - timeout: SYNC_TIMEOUT, - }, - // TODO: don't hard code timeout - Duration::from_secs(3), - )?; - } - Ok(()) - })(); - if let Err(e) = res { - tlog!(Warning, - "failed calling rpc::replication::promote: {e}"; - "replicaset_id" => %replicaset_id, - ); - } - - continue 'governor; - } - - //////////////////////////////////////////////////////////////////////// - // raft sync - // TODO: putting each stage in a different function - // will make the control flow more readable - let to_sync = instances.iter().find(|instance| { - instance.has_grades(CurrentGradeVariant::Offline, TargetGradeVariant::Online) - || instance.is_reincarnated() - }); - if let Some(instance) = to_sync { - let (rx, tx) = fiber::Channel::new(1).into_clones(); - let commit = raft_storage.commit().unwrap().unwrap(); - pool.call( - &instance.raft_id, - sync::Request { - commit, - timeout: SYNC_TIMEOUT, - }, - move |res| tx.send(res).expect("mustn't fail"), - ) - .expect("shouldn't fail"); - let res = rx.recv().expect("ought not fail"); - let res = res.and_then(|sync::Response { commit }| { - // TODO: change `Info` to `Debug` - tlog!(Info, "instance synced"; - "commit" => commit, - "instance_id" => &*instance.instance_id, - ); - - let req = update_instance::Request::new(instance.instance_id.clone(), cluster_id) - .with_current_grade(CurrentGrade::raft_synced( - instance.target_grade.incarnation, - )); - global() - .expect("can't be deinitialized") - .handle_update_instance_request_and_wait(req) - }); - match res { - Ok(()) => { - tlog!(Info, "raft sync processed"); - } - Err(e) => { - tlog!(Warning, "raft sync failed: {e}"; - "instance_id" => %instance.instance_id, - ); - - // TODO: don't hard code timeout - event::wait_timeout(Event::TopologyChanged, Duration::from_millis(300)) - .unwrap(); - } - } - - continue 'governor; - } - - //////////////////////////////////////////////////////////////////////// - // replication - let to_replicate = instances - .iter() - // TODO: find all such instances in a given replicaset, - // not just the first one - .find(|instance| { - instance.has_grades(CurrentGradeVariant::RaftSynced, TargetGradeVariant::Online) - }); - if let Some(instance) = to_replicate { - let replicaset_id = &instance.replicaset_id; - let replicaset_iids = maybe_responding(&instances) - .filter(|instance| instance.replicaset_id == replicaset_id) - .map(|instance| instance.instance_id.clone()) - .collect::<Vec<_>>(); - - let res = (|| -> traft::Result<_> { - let commit = raft_storage.commit()?.unwrap(); - let reqs = replicaset_iids - .iter() - .cloned() - .zip(repeat(replication::Request { - term, - commit, - timeout: SYNC_TIMEOUT, - replicaset_instances: replicaset_iids.clone(), - replicaset_id: replicaset_id.clone(), - })); - // TODO: don't hard code timeout - let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; - - for (instance_id, resp) in res { - let replication::Response { lsn } = resp?; - // TODO: change `Info` to `Debug` - tlog!(Info, "configured replication with instance"; - "instance_id" => %instance_id, - "lsn" => lsn, - ); - } - - let req = update_instance::Request::new(instance.instance_id.clone(), cluster_id) - .with_current_grade(CurrentGrade::replicated( - instance.target_grade.incarnation, - )); - node.handle_update_instance_request_and_wait(req)?; - - Ok(()) - })(); - if let Err(e) = res { - tlog!(Warning, "failed to configure replication: {e}"); - // TODO: don't hard code timeout - event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); - continue 'governor; - } - - let res = (|| -> Result<_, Error> { - let master_id = - if let Some(replicaset) = storage.replicasets.get(&instance.replicaset_id)? { - Cow::Owned(replicaset.master_id) - } else { - let vshard_bootstrapped = storage.state.vshard_bootstrapped()?; - let req = traft::OpDML::insert( - ClusterwideSpace::Replicaset, - &traft::Replicaset { - replicaset_id: instance.replicaset_id.clone(), - replicaset_uuid: instance.replicaset_uuid.clone(), - master_id: instance.instance_id.clone(), - weight: if vshard_bootstrapped { 0. } else { 1. }, - current_schema_version: 0, - }, - )?; - // TODO: don't hard code the timeout - node.propose_and_wait(req, Duration::from_secs(3))??; - Cow::Borrowed(&instance.instance_id) - }; - - let commit = raft_storage.commit()?.unwrap(); - pool.call_and_wait_timeout( - &*master_id, - replication::promote::Request { - term, - commit, - timeout: SYNC_TIMEOUT, - }, - // TODO: don't hard code timeout - Duration::from_secs(3), - )?; - tlog!(Debug, "promoted replicaset master"; - "instance_id" => %master_id, - "replicaset_id" => %instance.replicaset_id, - ); - Ok(()) - })(); - if let Err(e) = res { - tlog!(Warning, "failed to promote replicaset master: {e}"; - "replicaset_id" => %replicaset_id, - ); - } - - tlog!(Info, "configured replication"; "replicaset_id" => %replicaset_id); - - continue 'governor; - } - - //////////////////////////////////////////////////////////////////////// - // init sharding - let to_shard = instances.iter().find(|instance| { - instance.has_grades(CurrentGradeVariant::Replicated, TargetGradeVariant::Online) - }); - if let Some(instance) = to_shard { - let res = (|| -> traft::Result<()> { - let vshard_bootstrapped = storage.state.vshard_bootstrapped()?; - let commit = raft_storage.commit()?.unwrap(); - let reqs = maybe_responding(&instances).map(|instance| { - ( - instance.instance_id.clone(), - sharding::Request { - term, - commit, - timeout: SYNC_TIMEOUT, - bootstrap: !vshard_bootstrapped && instance.raft_id == node.raft_id, - }, - ) - }); - // TODO: don't hard code timeout - let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; - - for (instance_id, resp) in res { - let sharding::Response {} = resp?; - - // TODO: change `Info` to `Debug` - tlog!(Info, "initialized sharding with instance"; - "instance_id" => %instance_id, - ); - } - - let req = update_instance::Request::new(instance.instance_id.clone(), cluster_id) - .with_current_grade(CurrentGrade::sharding_initialized( - instance.target_grade.incarnation, - )); - node.handle_update_instance_request_and_wait(req)?; - - if !vshard_bootstrapped { - // TODO: if this fails, it will only rerun next time vshard - // gets reconfigured - node.propose_and_wait( - traft::OpDML::replace( - ClusterwideSpace::Property, - &(PropertyName::VshardBootstrapped, true), - )?, - // TODO: don't hard code the timeout - Duration::from_secs(3), - )??; - } - - Ok(()) - })(); - if let Err(e) = res { - tlog!(Warning, "failed to initialize sharding: {e}"); - // TODO: don't hard code timeout - event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); - continue 'governor; - } - - let res = (|| -> Result<(), Error> { - // Promote the replication leaders again - // because of tarantool bugs - let replicasets = storage.replicasets.iter()?; - let masters = replicasets.map(|r| r.master_id).collect::<HashSet<_>>(); - let commit = raft_storage.commit()?.unwrap(); - let reqs = maybe_responding(&instances) - .filter(|instance| masters.contains(&instance.instance_id)) - .map(|instance| instance.instance_id.clone()) - .zip(repeat(replication::promote::Request { - term, - commit, - timeout: SYNC_TIMEOUT, - })); - // TODO: don't hard code timeout - let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; - for (instance_id, resp) in res { - resp?; - tlog!(Debug, "promoted replicaset master"; "instance_id" => %instance_id); - } - Ok(()) - })(); - if let Err(e) = res { - tlog!(Warning, "failed to promote replicaset masters: {e}"); - } - - tlog!(Info, "sharding is initialized"); - - continue 'governor; - } - - //////////////////////////////////////////////////////////////////////// - // sharding weights - let to_update_weights = instances.iter().find(|instance| { - instance.has_grades( - CurrentGradeVariant::ShardingInitialized, - TargetGradeVariant::Online, - ) - }); - if let Some(instance) = to_update_weights { - let res = if let Some(added_weights) = - get_weight_changes(maybe_responding(&instances), &storage) - { - (|| -> traft::Result<()> { - for (replicaset_id, weight) in added_weights { - let mut ops = UpdateOps::new(); - ops.assign("weight", weight)?; - node.propose_and_wait( - traft::OpDML::update( - ClusterwideSpace::Replicaset, - &[replicaset_id], - ops, - )?, - // TODO: don't hard code the timeout - Duration::from_secs(3), - )??; - } - - let instance_ids = - maybe_responding(&instances).map(|instance| instance.instance_id.clone()); - let commit = raft_storage.commit()?.unwrap(); - let reqs = instance_ids.zip(repeat(sharding::Request { - term, - commit, - timeout: SYNC_TIMEOUT, - bootstrap: false, - })); - // TODO: don't hard code timeout - let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; - - for (instance_id, resp) in res { - resp?; - // TODO: change `Info` to `Debug` - tlog!(Info, "instance is online"; "instance_id" => %instance_id); - } - - let req = - update_instance::Request::new(instance.instance_id.clone(), cluster_id) - .with_current_grade(CurrentGrade::online( - instance.target_grade.incarnation, - )); - node.handle_update_instance_request_and_wait(req)?; - Ok(()) - })() - } else { - (|| -> traft::Result<()> { - let to_online = instances.iter().filter(|instance| { - instance.has_grades( - CurrentGradeVariant::ShardingInitialized, - TargetGradeVariant::Online, - ) - }); - for Instance { - instance_id, - target_grade, - .. - } in to_online - { - let cluster_id = cluster_id.clone(); - let req = update_instance::Request::new(instance_id.clone(), cluster_id) - .with_current_grade(CurrentGrade::online(target_grade.incarnation)); - node.handle_update_instance_request_and_wait(req)?; - // TODO: change `Info` to `Debug` - tlog!(Info, "instance is online"; "instance_id" => %instance_id); - } - Ok(()) - })() - }; - if let Err(e) = res { - tlog!(Warning, "updating sharding weights failed: {e}"); - - // TODO: don't hard code timeout - event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); - continue 'governor; - } - - tlog!(Info, "sharding is configured"); - - continue 'governor; - } - - //////////////////////////////////////////////////////////////////////// - // applying migrations - let desired_schema_version = storage.state.desired_schema_version().unwrap(); - let replicasets = storage.replicasets.iter().unwrap().collect::<Vec<_>>(); - let mut migrations = storage.migrations.iter().unwrap().collect::<Vec<_>>(); - let commit = raft_storage.commit().unwrap().unwrap(); - for (mid, rids) in waiting_migrations(&mut migrations, &replicasets, desired_schema_version) - { - let migration = storage.migrations.get(mid).unwrap().unwrap(); - for rid in rids { - let replicaset = storage - .replicasets - .get(rid.to_string().as_str()) - .unwrap() - .unwrap(); - let instance = storage.instances.get(&replicaset.master_id).unwrap(); - let req = rpc::migration::apply::Request { - term, - commit, - timeout: SYNC_TIMEOUT, - migration_id: migration.id, - }; - let res = pool.call_and_wait(&instance.raft_id, req); - match res { - Ok(_) => { - let mut ops = UpdateOps::new(); - ops.assign("current_schema_version", migration.id).unwrap(); - let op = OpDML::update( - ClusterwideSpace::Replicaset, - &[replicaset.replicaset_id.clone()], - ops, - ) - .unwrap(); - node.propose_and_wait(op, Duration::MAX).unwrap().unwrap(); - tlog!( - Info, - "Migration {0} applied to replicaset {1}", - migration.id, - replicaset.replicaset_id - ); - } - Err(e) => { - tlog!( - Warning, - "Could not apply migration {0} to replicaset {1}, error: {2}", - migration.id, - replicaset.replicaset_id, - e - ); - continue 'governor; - } - } - } - } - event::broadcast(Event::MigrateDone); - - event::wait_any(&[Event::TopologyChanged, Event::ClusterStateChanged]) - .expect("Events system must be initialized"); - } - - #[allow(clippy::type_complexity)] - fn call_all<R, I>( - pool: &mut ConnectionPool, - reqs: impl IntoIterator<Item = (I, R)>, - timeout: Duration, - ) -> traft::Result<Vec<(I, traft::Result<R::Response>)>> - where - R: traft::rpc::Request, - I: traft::network::IdOfInstance + 'static, - { - // TODO: this crap is only needed to wait until results of all - // the calls are ready. There are several ways to rafactor this: - // - we could use a std-style channel that unblocks the reading end - // once all the writing ends have dropped - // (fiber::Channel cannot do that for now) - // - using the std Futures we could use futures::join! - // - // Those things aren't implemented yet, so this is what we do - let reqs = reqs.into_iter().collect::<Vec<_>>(); - if reqs.is_empty() { - return Ok(vec![]); - } - static mut SENT_COUNT: usize = 0; - unsafe { SENT_COUNT = 0 }; - let (cond_rx, cond_tx) = Rc::new(fiber::Cond::new()).into_clones(); - let instance_count = reqs.len(); - let (rx, tx) = fiber::Channel::new(instance_count as _).into_clones(); - for (id, req) in reqs { - let tx = tx.clone(); - let cond_tx = cond_tx.clone(); - let id_copy = id.clone(); - pool.call(&id, req, move |res| { - tx.send((id_copy, res)).expect("mustn't fail"); - unsafe { SENT_COUNT += 1 }; - if unsafe { SENT_COUNT } == instance_count { - cond_tx.signal() - } - }) - .expect("shouldn't fail"); - } - // TODO: don't hard code timeout - if !cond_rx.wait_timeout(timeout) { - return Err(Error::Timeout); - } - - Ok(rx.into_iter().take(instance_count).collect()) - } - - #[inline(always)] - fn get_weight_changes<'p>( - instances: impl IntoIterator<Item = &'p Instance>, - storage: &Clusterwide, - ) -> Option<ReplicasetWeights> { - let replication_factor = storage.state.replication_factor().expect("storage error"); - let replicaset_weights = storage.replicasets.weights().expect("storage error"); - let mut replicaset_sizes = HashMap::new(); - let mut weight_changes = HashMap::new(); - for instance @ Instance { replicaset_id, .. } in instances { - if !instance.may_respond() { - continue; - } - let replicaset_size = replicaset_sizes.entry(replicaset_id.clone()).or_insert(0); - *replicaset_size += 1; - if *replicaset_size >= replication_factor && replicaset_weights[replicaset_id] == 0. { - weight_changes.entry(replicaset_id.clone()).or_insert(1.); - } - } - (!weight_changes.is_empty()).then_some(weight_changes) - } - - #[inline(always)] - fn maybe_responding(instances: &[Instance]) -> impl Iterator<Item = &Instance> { - instances.iter().filter(|instance| instance.may_respond()) - } -} - static mut RAFT_NODE: Option<Box<Node>> = None; pub fn set_global(node: Node) { diff --git a/src/traft/raft_storage.rs b/src/traft/raft_storage.rs index 3bb940139d..649e061dc5 100644 --- a/src/traft/raft_storage.rs +++ b/src/traft/raft_storage.rs @@ -130,9 +130,9 @@ impl RaftSpaceAccess { /// Node generation i.e. the number of restarts. pub fn gen(&self) -> _<u64>; - pub(super) fn term(&self) -> _<RaftTerm>; + pub(crate) fn term(&self) -> _<RaftTerm>; fn vote(&self) -> _<RaftId>; - pub(super) fn commit(&self) -> _<RaftIndex>; + pub(crate) fn commit(&self) -> _<RaftIndex>; pub fn applied(&self) -> _<RaftIndex>; pub(crate) fn voters(&self) -> _<Vec<RaftId>>; -- GitLab