Skip to content
Snippets Groups Projects
Commit 280816eb authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor(governor): plan for creating replicaset

parent df43157a
No related branches found
No related tags found
1 merge request!430Refactor/governor/action plan
......@@ -87,6 +87,7 @@ impl Loop {
let commit = raft_storage.commit().unwrap().unwrap();
let cluster_id = raft_storage.cluster_id().unwrap().unwrap();
let node = global().expect("must be initialized");
let vshard_bootstrapped = storage.properties.vshard_bootstrapped().unwrap();
let plan = action_plan(
term,
......@@ -97,6 +98,7 @@ impl Loop {
&learners,
&replicasets,
node.raft_id,
vshard_bootstrapped,
);
let plan = unwrap_ok_or!(plan,
Err(e) => {
......@@ -218,103 +220,41 @@ impl Loop {
}
}
Plan::None => {
tlog!(Info, "nothing to do");
did_something = false;
}
}
if did_something {
return Continue;
}
////////////////////////////////////////////////////////////////////////
// create new replicaset
if let Some(to_create_replicaset) = instances
.iter()
.filter(|instance| {
instance.has_grades(CurrentGradeVariant::RaftSynced, TargetGradeVariant::Online)
})
.find_map(
|instance| match storage.replicasets.get(&instance.replicaset_id) {
Err(e) => Some(Err(e)),
Ok(None) => Some(Ok(instance)),
Ok(_) => None,
},
)
{
// TODO: what if this is not actually the replicaset bootstrap leader?
let Instance {
instance_id,
Plan::CreateReplicaset(CreateReplicaset {
master_id,
replicaset_id,
replicaset_uuid,
..
} = unwrap_ok_or!(to_create_replicaset,
Err(e) => {
tlog!(Warning, "{e}");
// TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_millis(300)).unwrap();
return Continue;
rpc,
op,
}) => {
governor_step! {
"promoting new replicaset master" [
"master_id" => %master_id,
"replicaset_id" => %replicaset_id,
]
async {
pool.call(master_id, &rpc)?
.timeout(Duration::from_secs(3))
.await??
}
}
);
// TODO: change `Info` to `Debug`
tlog!(Info, "promoting new replicaset master";
"master_id" => %instance_id,
"replicaset_id" => %replicaset_id,
);
let res: Result<_> = async {
let req = replication::promote::Request {
term,
commit: raft_storage.commit()?.unwrap(),
timeout: Self::SYNC_TIMEOUT,
};
let replication::promote::Response {} = pool
.call(instance_id, &req)?
// TODO: don't hard code the timeout
.timeout(Duration::from_secs(3))
.await??;
Ok(())
}
.await;
if let Err(e) = res {
tlog!(Warning, "failed promoting new replicaset master: {e}";
"master_id" => %instance_id,
"replicaset_id" => %replicaset_id,
);
return Continue;
governor_step! {
"creating new replicaset" [
"replicaset_id" => %replicaset_id,
]
async {
node.propose_and_wait(op, Duration::from_secs(3))??;
}
}
}
// TODO: change `Info` to `Debug`
tlog!(Info, "creating new replicaset";
"master_id" => %instance_id,
"replicaset_id" => %replicaset_id,
);
let res: Result<_> = async {
let vshard_bootstrapped = storage.properties.vshard_bootstrapped()?;
let req = OpDML::insert(
ClusterwideSpace::Replicaset,
&Replicaset {
replicaset_id: replicaset_id.clone(),
replicaset_uuid: replicaset_uuid.clone(),
master_id: instance_id.clone(),
weight: if vshard_bootstrapped { 0. } else { 1. },
current_schema_version: 0,
},
)?;
// TODO: don't hard code the timeout
node.propose_and_wait(req, Duration::from_secs(3))??;
Ok(())
}
.await;
if let Err(e) = res {
tlog!(Warning, "failed creating new replicaset: {e}";
"replicaset_id" => %replicaset_id,
);
return Continue;
Plan::None => {
tlog!(Info, "nothing to do");
did_something = false;
}
}
if did_something {
return Continue;
}
......@@ -645,6 +585,7 @@ fn action_plan<'i>(
learners: &[RaftId],
replicasets: &HashMap<&ReplicasetId, &'i Replicaset>,
my_raft_id: RaftId,
vshard_bootstrapped: bool,
) -> Result<Plan<'i>> {
////////////////////////////////////////////////////////////////////////////
// conf change
......@@ -758,6 +699,40 @@ fn action_plan<'i>(
return Ok(RaftSync { instance_id, rpc, req }.into());
}
////////////////////////////////////////////////////////////////////////////
// create new replicaset
let to_create_replicaset = instances
.iter()
.filter(|instance| {
instance.has_grades(CurrentGradeVariant::RaftSynced, TargetGradeVariant::Online)
})
.find(|instance| replicasets.get(&instance.replicaset_id).is_none());
if let Some(Instance {
instance_id: master_id,
replicaset_id,
replicaset_uuid,
..
}) = to_create_replicaset
{
let rpc = replication::promote::Request {
term,
commit,
timeout: Loop::SYNC_TIMEOUT,
};
let op = OpDML::insert(
ClusterwideSpace::Replicaset,
&Replicaset {
replicaset_id: replicaset_id.clone(),
replicaset_uuid: replicaset_uuid.clone(),
master_id: master_id.clone(),
weight: if vshard_bootstrapped { 0. } else { 1. },
current_schema_version: 0,
},
)?;
#[rustfmt::skip]
return Ok(CreateReplicaset { master_id, replicaset_id, rpc, op }.into());
}
Ok(Plan::None)
}
......@@ -906,6 +881,13 @@ mod actions {
pub req: update_instance::Request,
}
pub struct CreateReplicaset<'i> {
pub master_id: &'i InstanceId,
pub replicaset_id: &'i ReplicasetId,
pub rpc: replication::promote::Request,
pub op: OpDML,
}
pub enum Plan<'i> {
None,
ConfChange(ConfChangeV2),
......@@ -913,6 +895,7 @@ mod actions {
TransferMastership(TransferMastership<'i>),
ReconfigureShardingAndDowngrade(ReconfigureShardingAndDowngrade<'i>),
RaftSync(RaftSync<'i>),
CreateReplicaset(CreateReplicaset<'i>),
}
impl From<ConfChangeV2> for Plan<'_> {
......@@ -944,4 +927,10 @@ mod actions {
Self::RaftSync(a)
}
}
impl<'i> From<CreateReplicaset<'i>> for Plan<'i> {
fn from(a: CreateReplicaset<'i>) -> Self {
Self::CreateReplicaset(a)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment