diff --git a/src/governor/mod.rs b/src/governor/mod.rs index 3d525509d30376d3789872db62f9e1950a7d4f2d..1e7dad4f16d28c1171edb606fdd13b5f3ff0f0a1 100644 --- a/src/governor/mod.rs +++ b/src/governor/mod.rs @@ -10,6 +10,7 @@ use ::tarantool::space::UpdateOps; use ::tarantool::util::IntoClones as _; use crate::event::{self, Event}; +use crate::r#loop::FlowControl::{self, Continue}; use crate::storage::{Clusterwide, ClusterwideSpace, PropertyName}; use crate::tlog; use crate::traft::error::Error; @@ -31,24 +32,20 @@ pub(crate) mod migration; pub(crate) use cc::raft_conf_change; pub(crate) use migration::waiting_migrations; -pub(crate) fn governor_loop( - status: Rc<Cell<Status>>, - storage: Clusterwide, - raft_storage: RaftSpaceAccess, -) { - let mut pool = ConnectionPool::builder(storage.clone()) - .call_timeout(Duration::from_secs(1)) - .connect_timeout(Duration::from_millis(500)) - .inactivity_timeout(Duration::from_secs(60)) - .build(); - - // TODO: don't hardcode this +impl Loop { const SYNC_TIMEOUT: Duration = Duration::from_secs(10); - 'governor: loop { + async fn iter_fn( + Args { + status, + storage, + raft_storage, + }: &Args, + State { pool }: &mut State, + ) -> FlowControl { if !status.get().raft_state.is_leader() { event::wait(Event::StatusChanged).expect("Events system must be initialized"); - continue 'governor; + return Continue; } let instances = storage.instances.all_instances().unwrap(); @@ -70,7 +67,7 @@ pub(crate) fn governor_loop( tlog!(Warning, "failed proposing conf_change: {e}"); fiber::sleep(Duration::from_secs(1)); } - continue 'governor; + return Continue; } //////////////////////////////////////////////////////////////////////// @@ -110,7 +107,7 @@ pub(crate) fn governor_loop( ); node.transfer_leadership_and_yield(new_leader.raft_id); event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); - continue 'governor; + return Continue; } } @@ -143,7 +140,7 @@ pub(crate) fn governor_loop( tlog!(Warning, "failed proposing replicaset master change: {e}"); // TODO: don't hard code timeout event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); - continue 'governor; + return Continue; } // reconfigure vshard storages and routers @@ -164,13 +161,13 @@ pub(crate) fn governor_loop( sharding::Request { term, commit, - timeout: SYNC_TIMEOUT, + timeout: Self::SYNC_TIMEOUT, bootstrap: false, }, ) }); // TODO: don't hard code timeout - let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; + let res = call_all(pool, reqs, Duration::from_secs(3))?; for (_, resp) in res { let sharding::Response {} = resp?; } @@ -180,12 +177,12 @@ pub(crate) fn governor_loop( tlog!(Warning, "failed calling rpc::sharding: {e}"); // TODO: don't hard code timeout event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); - continue 'governor; + return Continue; } // update instance's CurrentGrade let req = - update_instance::Request::new(instance.instance_id.clone(), cluster_id.clone()) + update_instance::Request::new(instance.instance_id.clone(), cluster_id) .with_current_grade(instance.target_grade.into()); tlog!(Info, "handling update_instance::Request"; @@ -199,7 +196,7 @@ pub(crate) fn governor_loop( ); // TODO: don't hard code timeout event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); - continue 'governor; + return Continue; } let replicaset_instances = storage @@ -218,7 +215,7 @@ pub(crate) fn governor_loop( "replicaset lost quorum"; "replicaset_id" => %replicaset_id, ); - continue 'governor; + return Continue; } let res = (|| -> Result<_> { @@ -235,7 +232,7 @@ pub(crate) fn governor_loop( replication::promote::Request { term, commit, - timeout: SYNC_TIMEOUT, + timeout: Self::SYNC_TIMEOUT, }, // TODO: don't hard code timeout Duration::from_secs(3), @@ -250,7 +247,7 @@ pub(crate) fn governor_loop( ); } - continue 'governor; + return Continue; } //////////////////////////////////////////////////////////////////////// @@ -268,7 +265,7 @@ pub(crate) fn governor_loop( &instance.raft_id, sync::Request { commit, - timeout: SYNC_TIMEOUT, + timeout: Self::SYNC_TIMEOUT, }, move |res| tx.send(res).expect("mustn't fail"), ) @@ -304,7 +301,7 @@ pub(crate) fn governor_loop( } } - continue 'governor; + return Continue; } //////////////////////////////////////////////////////////////////////// @@ -331,12 +328,12 @@ pub(crate) fn governor_loop( .zip(repeat(replication::Request { term, commit, - timeout: SYNC_TIMEOUT, + timeout: Self::SYNC_TIMEOUT, replicaset_instances: replicaset_iids.clone(), replicaset_id: replicaset_id.clone(), })); // TODO: don't hard code timeout - let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; + let res = call_all(pool, reqs, Duration::from_secs(3))?; for (instance_id, resp) in res { let replication::Response { lsn } = resp?; @@ -359,7 +356,7 @@ pub(crate) fn governor_loop( tlog!(Warning, "failed to configure replication: {e}"); // TODO: don't hard code timeout event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); - continue 'governor; + return Continue; } let res = (|| -> Result<_> { @@ -389,7 +386,7 @@ pub(crate) fn governor_loop( replication::promote::Request { term, commit, - timeout: SYNC_TIMEOUT, + timeout: Self::SYNC_TIMEOUT, }, // TODO: don't hard code timeout Duration::from_secs(3), @@ -408,7 +405,7 @@ pub(crate) fn governor_loop( tlog!(Info, "configured replication"; "replicaset_id" => %replicaset_id); - continue 'governor; + return Continue; } //////////////////////////////////////////////////////////////////////// @@ -426,13 +423,13 @@ pub(crate) fn governor_loop( sharding::Request { term, commit, - timeout: SYNC_TIMEOUT, + timeout: Self::SYNC_TIMEOUT, bootstrap: !vshard_bootstrapped && instance.raft_id == node.raft_id, }, ) }); // TODO: don't hard code timeout - let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; + let res = call_all(pool, reqs, Duration::from_secs(3))?; for (instance_id, resp) in res { let sharding::Response {} = resp?; @@ -468,7 +465,7 @@ pub(crate) fn governor_loop( tlog!(Warning, "failed to initialize sharding: {e}"); // TODO: don't hard code timeout event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); - continue 'governor; + return Continue; } let res = (|| -> Result<()> { @@ -483,10 +480,10 @@ pub(crate) fn governor_loop( .zip(repeat(replication::promote::Request { term, commit, - timeout: SYNC_TIMEOUT, + timeout: Self::SYNC_TIMEOUT, })); // TODO: don't hard code timeout - let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; + let res = call_all(pool, reqs, Duration::from_secs(3))?; for (instance_id, resp) in res { resp?; tlog!(Debug, "promoted replicaset master"; "instance_id" => %instance_id); @@ -499,7 +496,7 @@ pub(crate) fn governor_loop( tlog!(Info, "sharding is initialized"); - continue 'governor; + return Continue; } //////////////////////////////////////////////////////////////////////// @@ -512,7 +509,7 @@ pub(crate) fn governor_loop( }); if let Some(instance) = to_update_weights { let res = if let Some(added_weights) = - get_weight_changes(maybe_responding(&instances), &storage) + get_weight_changes(maybe_responding(&instances), storage) { (|| -> Result<()> { for (replicaset_id, weight) in added_weights { @@ -531,11 +528,11 @@ pub(crate) fn governor_loop( let reqs = instance_ids.zip(repeat(sharding::Request { term, commit, - timeout: SYNC_TIMEOUT, + timeout: Self::SYNC_TIMEOUT, bootstrap: false, })); // TODO: don't hard code timeout - let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; + let res = call_all(pool, reqs, Duration::from_secs(3))?; for (instance_id, resp) in res { resp?; @@ -580,12 +577,12 @@ pub(crate) fn governor_loop( // TODO: don't hard code timeout event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); - continue 'governor; + return Continue; } tlog!(Info, "sharding is configured"); - continue 'governor; + return Continue; } //////////////////////////////////////////////////////////////////////// @@ -607,7 +604,7 @@ pub(crate) fn governor_loop( let req = rpc::migration::apply::Request { term, commit, - timeout: SYNC_TIMEOUT, + timeout: Self::SYNC_TIMEOUT, migration_id: migration.id, }; let res = pool.call_and_wait(&instance.raft_id, req); @@ -637,7 +634,7 @@ pub(crate) fn governor_loop( replicaset.replicaset_id, e ); - continue 'governor; + return Continue; } } } @@ -646,7 +643,47 @@ pub(crate) fn governor_loop( event::wait_any(&[Event::TopologyChanged, Event::ClusterStateChanged]) .expect("Events system must be initialized"); + + Continue } + + pub fn start( + status: Rc<Cell<Status>>, + storage: Clusterwide, + raft_storage: RaftSpaceAccess, + ) -> Self { + let args = Args { + status, + storage, + raft_storage, + }; + + let state = State { + pool: ConnectionPool::builder(args.storage.clone()) + .call_timeout(Duration::from_secs(1)) + .connect_timeout(Duration::from_millis(500)) + .inactivity_timeout(Duration::from_secs(60)) + .build(), + }; + + Self { + _loop: crate::loop_start!("governor_loop", Self::iter_fn, args, state), + } + } +} + +pub struct Loop { + _loop: Option<fiber::UnitJoinHandle<'static>>, +} + +struct Args { + status: Rc<Cell<Status>>, + storage: Clusterwide, + raft_storage: RaftSpaceAccess, +} + +struct State { + pool: ConnectionPool, } #[allow(clippy::type_complexity)] diff --git a/src/traft/node.rs b/src/traft/node.rs index 78b4913d95b25f950845f9625b38a8d64b45a25e..097e9f561d83ffd29c134114c29d11c1e973352f 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -25,7 +25,7 @@ use std::rc::Rc; use std::time::Duration; use std::time::Instant; -use crate::governor::governor_loop; +use crate::governor; use crate::kvcell::KVCell; use crate::loop_start; use crate::r#loop::FlowControl; @@ -122,7 +122,7 @@ pub struct Node { pub(crate) storage: Clusterwide, pub(crate) raft_storage: RaftSpaceAccess, main_loop: MainLoop, - _conf_change_loop: fiber::UnitJoinHandle<'static>, + _governor_loop: governor::Loop, status: Rc<Cell<Status>>, } @@ -150,21 +150,14 @@ impl Node { let node_impl = Rc::new(Mutex::new(node_impl)); - let governor_loop_fn = { - let status = status.clone(); - let storage = storage.clone(); - let raft_storage = raft_storage.clone(); - move || governor_loop(status, storage, raft_storage) - }; - let node = Node { raft_id, main_loop: MainLoop::start(status.clone(), node_impl.clone()), // yields - _conf_change_loop: fiber::Builder::new() - .name("governor_loop") - .proc(governor_loop_fn) - .start() - .unwrap(), + _governor_loop: governor::Loop::start( + status.clone(), + storage.clone(), + raft_storage.clone(), + ), node_impl, storage, raft_storage,