Skip to content
Snippets Groups Projects

Refactor/governor

Merged Georgy Moshkin requested to merge refactor/governor into master
2 files
+ 90
61
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 83
47
@@ -10,6 +10,7 @@ use ::tarantool::space::UpdateOps;
use ::tarantool::util::IntoClones as _;
use crate::event::{self, Event};
use crate::r#loop::FlowControl::{self, Continue};
use crate::storage::{Clusterwide, ClusterwideSpace, PropertyName};
use crate::tlog;
use crate::traft::error::Error;
@@ -31,24 +32,20 @@ pub(crate) mod migration;
pub(crate) use cc::raft_conf_change;
pub(crate) use migration::waiting_migrations;
pub(crate) fn governor_loop(
status: Rc<Cell<Status>>,
storage: Clusterwide,
raft_storage: RaftSpaceAccess,
) {
let mut pool = ConnectionPool::builder(storage.clone())
.call_timeout(Duration::from_secs(1))
.connect_timeout(Duration::from_millis(500))
.inactivity_timeout(Duration::from_secs(60))
.build();
// TODO: don't hardcode this
impl Loop {
const SYNC_TIMEOUT: Duration = Duration::from_secs(10);
'governor: loop {
async fn iter_fn(
Args {
status,
storage,
raft_storage,
}: &Args,
State { pool }: &mut State,
) -> FlowControl {
if !status.get().raft_state.is_leader() {
event::wait(Event::StatusChanged).expect("Events system must be initialized");
continue 'governor;
return Continue;
}
let instances = storage.instances.all_instances().unwrap();
@@ -70,7 +67,7 @@ pub(crate) fn governor_loop(
tlog!(Warning, "failed proposing conf_change: {e}");
fiber::sleep(Duration::from_secs(1));
}
continue 'governor;
return Continue;
}
////////////////////////////////////////////////////////////////////////
@@ -110,7 +107,7 @@ pub(crate) fn governor_loop(
);
node.transfer_leadership_and_yield(new_leader.raft_id);
event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
continue 'governor;
return Continue;
}
}
@@ -143,7 +140,7 @@ pub(crate) fn governor_loop(
tlog!(Warning, "failed proposing replicaset master change: {e}");
// TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
continue 'governor;
return Continue;
}
// reconfigure vshard storages and routers
@@ -164,13 +161,13 @@ pub(crate) fn governor_loop(
sharding::Request {
term,
commit,
timeout: SYNC_TIMEOUT,
timeout: Self::SYNC_TIMEOUT,
bootstrap: false,
},
)
});
// TODO: don't hard code timeout
let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
let res = call_all(pool, reqs, Duration::from_secs(3))?;
for (_, resp) in res {
let sharding::Response {} = resp?;
}
@@ -180,13 +177,12 @@ pub(crate) fn governor_loop(
tlog!(Warning, "failed calling rpc::sharding: {e}");
// TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
continue 'governor;
return Continue;
}
// update instance's CurrentGrade
let req =
update_instance::Request::new(instance.instance_id.clone(), cluster_id.clone())
.with_current_grade(instance.target_grade.into());
let req = update_instance::Request::new(instance.instance_id.clone(), cluster_id)
.with_current_grade(instance.target_grade.into());
tlog!(Info,
"handling update_instance::Request";
"current_grade" => %req.current_grade.expect("just set"),
@@ -199,7 +195,7 @@ pub(crate) fn governor_loop(
);
// TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
continue 'governor;
return Continue;
}
let replicaset_instances = storage
@@ -218,7 +214,7 @@ pub(crate) fn governor_loop(
"replicaset lost quorum";
"replicaset_id" => %replicaset_id,
);
continue 'governor;
return Continue;
}
let res = (|| -> Result<_> {
@@ -235,7 +231,7 @@ pub(crate) fn governor_loop(
replication::promote::Request {
term,
commit,
timeout: SYNC_TIMEOUT,
timeout: Self::SYNC_TIMEOUT,
},
// TODO: don't hard code timeout
Duration::from_secs(3),
@@ -250,7 +246,7 @@ pub(crate) fn governor_loop(
);
}
continue 'governor;
return Continue;
}
////////////////////////////////////////////////////////////////////////
@@ -268,7 +264,7 @@ pub(crate) fn governor_loop(
&instance.raft_id,
sync::Request {
commit,
timeout: SYNC_TIMEOUT,
timeout: Self::SYNC_TIMEOUT,
},
move |res| tx.send(res).expect("mustn't fail"),
)
@@ -304,7 +300,7 @@ pub(crate) fn governor_loop(
}
}
continue 'governor;
return Continue;
}
////////////////////////////////////////////////////////////////////////
@@ -331,12 +327,12 @@ pub(crate) fn governor_loop(
.zip(repeat(replication::Request {
term,
commit,
timeout: SYNC_TIMEOUT,
timeout: Self::SYNC_TIMEOUT,
replicaset_instances: replicaset_iids.clone(),
replicaset_id: replicaset_id.clone(),
}));
// TODO: don't hard code timeout
let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
let res = call_all(pool, reqs, Duration::from_secs(3))?;
for (instance_id, resp) in res {
let replication::Response { lsn } = resp?;
@@ -359,7 +355,7 @@ pub(crate) fn governor_loop(
tlog!(Warning, "failed to configure replication: {e}");
// TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
continue 'governor;
return Continue;
}
let res = (|| -> Result<_> {
@@ -389,7 +385,7 @@ pub(crate) fn governor_loop(
replication::promote::Request {
term,
commit,
timeout: SYNC_TIMEOUT,
timeout: Self::SYNC_TIMEOUT,
},
// TODO: don't hard code timeout
Duration::from_secs(3),
@@ -408,7 +404,7 @@ pub(crate) fn governor_loop(
tlog!(Info, "configured replication"; "replicaset_id" => %replicaset_id);
continue 'governor;
return Continue;
}
////////////////////////////////////////////////////////////////////////
@@ -426,13 +422,13 @@ pub(crate) fn governor_loop(
sharding::Request {
term,
commit,
timeout: SYNC_TIMEOUT,
timeout: Self::SYNC_TIMEOUT,
bootstrap: !vshard_bootstrapped && instance.raft_id == node.raft_id,
},
)
});
// TODO: don't hard code timeout
let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
let res = call_all(pool, reqs, Duration::from_secs(3))?;
for (instance_id, resp) in res {
let sharding::Response {} = resp?;
@@ -468,7 +464,7 @@ pub(crate) fn governor_loop(
tlog!(Warning, "failed to initialize sharding: {e}");
// TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
continue 'governor;
return Continue;
}
let res = (|| -> Result<()> {
@@ -483,10 +479,10 @@ pub(crate) fn governor_loop(
.zip(repeat(replication::promote::Request {
term,
commit,
timeout: SYNC_TIMEOUT,
timeout: Self::SYNC_TIMEOUT,
}));
// TODO: don't hard code timeout
let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
let res = call_all(pool, reqs, Duration::from_secs(3))?;
for (instance_id, resp) in res {
resp?;
tlog!(Debug, "promoted replicaset master"; "instance_id" => %instance_id);
@@ -499,7 +495,7 @@ pub(crate) fn governor_loop(
tlog!(Info, "sharding is initialized");
continue 'governor;
return Continue;
}
////////////////////////////////////////////////////////////////////////
@@ -512,7 +508,7 @@ pub(crate) fn governor_loop(
});
if let Some(instance) = to_update_weights {
let res = if let Some(added_weights) =
get_weight_changes(maybe_responding(&instances), &storage)
get_weight_changes(maybe_responding(&instances), storage)
{
(|| -> Result<()> {
for (replicaset_id, weight) in added_weights {
@@ -531,11 +527,11 @@ pub(crate) fn governor_loop(
let reqs = instance_ids.zip(repeat(sharding::Request {
term,
commit,
timeout: SYNC_TIMEOUT,
timeout: Self::SYNC_TIMEOUT,
bootstrap: false,
}));
// TODO: don't hard code timeout
let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
let res = call_all(pool, reqs, Duration::from_secs(3))?;
for (instance_id, resp) in res {
resp?;
@@ -580,12 +576,12 @@ pub(crate) fn governor_loop(
// TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
continue 'governor;
return Continue;
}
tlog!(Info, "sharding is configured");
continue 'governor;
return Continue;
}
////////////////////////////////////////////////////////////////////////
@@ -607,7 +603,7 @@ pub(crate) fn governor_loop(
let req = rpc::migration::apply::Request {
term,
commit,
timeout: SYNC_TIMEOUT,
timeout: Self::SYNC_TIMEOUT,
migration_id: migration.id,
};
let res = pool.call_and_wait(&instance.raft_id, req);
@@ -637,7 +633,7 @@ pub(crate) fn governor_loop(
replicaset.replicaset_id,
e
);
continue 'governor;
return Continue;
}
}
}
@@ -646,7 +642,47 @@ pub(crate) fn governor_loop(
event::wait_any(&[Event::TopologyChanged, Event::ClusterStateChanged])
.expect("Events system must be initialized");
Continue
}
pub fn start(
status: Rc<Cell<Status>>,
storage: Clusterwide,
raft_storage: RaftSpaceAccess,
) -> Self {
let args = Args {
status,
storage,
raft_storage,
};
let state = State {
pool: ConnectionPool::builder(args.storage.clone())
.call_timeout(Duration::from_secs(1))
.connect_timeout(Duration::from_millis(500))
.inactivity_timeout(Duration::from_secs(60))
.build(),
};
Self {
_loop: crate::loop_start!("governor_loop", Self::iter_fn, args, state),
}
}
}
pub struct Loop {
_loop: Option<fiber::UnitJoinHandle<'static>>,
}
struct Args {
status: Rc<Cell<Status>>,
storage: Clusterwide,
raft_storage: RaftSpaceAccess,
}
struct State {
pool: ConnectionPool,
}
#[allow(clippy::type_complexity)]
Loading