Skip to content
Snippets Groups Projects
Commit 7060a2af authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor(governor): plan for replication

parent 5e52cfa7
No related branches found
No related tags found
1 merge request!430Refactor/governor/action plan
......@@ -248,6 +248,50 @@ impl Loop {
}
}
Plan::Replication(Replication { targets, rpc, req }) => {
governor_step! {
"configuring replication"
async {
let mut fs = vec![];
for instance_id in targets {
tlog!(Info, "calling rpc::replication"; "instance_id" => %instance_id);
let resp = pool.call(instance_id, &rpc)?;
fs.push(async move {
match resp.await {
Ok(resp) => {
tlog!(Info, "configured replication with instance";
"instance_id" => %instance_id,
"lsn" => resp.lsn,
);
Ok(())
}
Err(e) => {
tlog!(Warning, "failed calling rpc::replication: {e}";
"instance_id" => %instance_id
);
Err(e)
}
}
});
}
// TODO: don't hard code timeout
try_join_all(fs).timeout(Duration::from_secs(3)).await??
}
}
let instance_id = req.instance_id.clone();
let current_grade = req.current_grade.expect("must be set");
governor_step! {
"handling instance grade change" [
"instance_id" => %instance_id,
"current_grade" => %current_grade,
]
async {
node.handle_update_instance_request_and_wait(req)?
}
}
}
Plan::None => {
tlog!(Info, "nothing to do");
did_something = false;
......@@ -258,65 +302,6 @@ impl Loop {
return Continue;
}
////////////////////////////////////////////////////////////////////////
// replication
let to_replicate = instances
.iter()
// TODO: find all such instances in a given replicaset,
// not just the first one
.find(|instance| {
instance.has_grades(CurrentGradeVariant::RaftSynced, TargetGradeVariant::Online)
});
if let Some(instance) = to_replicate {
let replicaset_id = &instance.replicaset_id;
let replicaset_iids = maybe_responding(instances)
.filter(|instance| instance.replicaset_id == replicaset_id)
.map(|instance| instance.instance_id.clone())
.collect::<Vec<_>>();
let res: Result<_> = async {
let commit = raft_storage.commit()?.unwrap();
let reqs = replicaset_iids
.iter()
.cloned()
.zip(repeat(replication::Request {
term,
commit,
timeout: Self::SYNC_TIMEOUT,
}));
// TODO: don't hard code timeout
let res = call_all(pool, reqs, Duration::from_secs(3)).await?;
for (instance_id, resp) in res {
let replication::Response { lsn } = resp?;
// TODO: change `Info` to `Debug`
tlog!(Info, "configured replication with instance";
"instance_id" => %instance_id,
"lsn" => lsn,
);
}
let req = update_instance::Request::new(instance.instance_id.clone(), cluster_id)
.with_current_grade(CurrentGrade::replicated(
instance.target_grade.incarnation,
));
node.handle_update_instance_request_and_wait(req)?;
Ok(())
}
.await;
if let Err(e) = res {
tlog!(Warning, "failed to configure replication: {e}");
// TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
return Continue;
}
tlog!(Info, "configured replication"; "replicaset_id" => %replicaset_id);
return Continue;
}
////////////////////////////////////////////////////////////////////////
// init sharding
let to_shard = instances.iter().find(|instance| {
......@@ -731,6 +716,37 @@ fn action_plan<'i>(
return Ok(CreateReplicaset { master_id, replicaset_id, rpc, op }.into());
}
////////////////////////////////////////////////////////////////////////////
// replication
let to_replicate = instances
.iter()
// TODO: find all such instances in a given replicaset,
// not just the first one
.find(|instance| {
instance.has_grades(CurrentGradeVariant::RaftSynced, TargetGradeVariant::Online)
});
if let Some(Instance {
instance_id,
replicaset_id,
target_grade,
..
}) = to_replicate
{
let targets = maybe_responding(instances)
.filter(|instance| instance.replicaset_id == replicaset_id)
.map(|instance| &instance.instance_id)
.collect();
let rpc = replication::Request {
term,
commit,
timeout: Loop::SYNC_TIMEOUT,
};
let req = update_instance::Request::new(instance_id.clone(), cluster_id)
.with_current_grade(CurrentGrade::replicated(target_grade.incarnation));
return Ok(Replication { targets, rpc, req }.into());
}
Ok(Plan::None)
}
......@@ -886,6 +902,12 @@ mod actions {
pub op: OpDML,
}
pub struct Replication<'i> {
pub targets: Vec<&'i InstanceId>,
pub rpc: replication::Request,
pub req: update_instance::Request,
}
pub enum Plan<'i> {
None,
ConfChange(ConfChangeV2),
......@@ -894,6 +916,7 @@ mod actions {
ReconfigureShardingAndDowngrade(ReconfigureShardingAndDowngrade<'i>),
RaftSync(RaftSync<'i>),
CreateReplicaset(CreateReplicaset<'i>),
Replication(Replication<'i>),
}
impl From<ConfChangeV2> for Plan<'_> {
......@@ -931,4 +954,10 @@ mod actions {
Self::CreateReplicaset(a)
}
}
impl<'i> From<Replication<'i>> for Plan<'i> {
fn from(a: Replication<'i>) -> Self {
Self::Replication(a)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment